[
https://issues.apache.org/jira/browse/FLINK-24626?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Kenyore updated FLINK-24626:
----------------------------
Description:
The JDBC sink will lose some data while using
TableBufferReducedStatementExecutor and left join.
Here are a snippet of executeBatch.
{code:java}
@Override
public void executeBatch() throws SQLException {
for (Map.Entry<RowData, Tuple2<Boolean, RowData>> entry :
reduceBuffer.entrySet()) {
if (entry.getValue().f0) {
upsertExecutor.addToBatch(entry.getValue().f1);
} else {
// delete by key
deleteExecutor.addToBatch(entry.getKey());
}
}
upsertExecutor.executeBatch();
deleteExecutor.executeBatch();
reduceBuffer.clear();
}
{code}
Left join will generate a DETETE row before upsert row and the executeBatch
will excute them in a wrong order.Whitch may causes data lose.
was:
The JDBC sink will lose some data while using
TableBufferReducedStatementExecutor.
Here are some snippets.
{code}
@Override
public void addToBatch(RowData record) throws SQLException {
RowData key = keyExtractor.apply(record);
if(record.getRowKind()==RowKind.DELETE) {
//XXX cut delete off because the retract stream would generate
return;
}
boolean flag = changeFlag(record.getRowKind());
RowData value = valueTransform.apply(record); // copy or not
reduceBuffer.put(key, Tuple2.of(flag, value));
}
private boolean changeFlag(RowKind rowKind) {
switch (rowKind) {
case INSERT:
case UPDATE_AFTER:
return true;
case DELETE:
case UPDATE_BEFORE:
return false;
default:
throw new UnsupportedOperationException(
String.format(
"Unknown row kind, the supported row kinds is:
INSERT, UPDATE_BEFORE, UPDATE_AFTER,"
+ " DELETE, but get: %s.",
rowKind));
}
}
{code}
The code above add changeFlag to Tuple2 as the sign of upsert or delete
{code}
@Override
public void executeBatch() throws SQLException {
for (Map.Entry<RowData, Tuple2<Boolean, RowData>> entry :
reduceBuffer.entrySet()) {
if (entry.getValue().f0) {
upsertExecutor.addToBatch(entry.getValue().f1);
} else {
// delete by key
deleteExecutor.addToBatch(entry.getKey());
}
}
upsertExecutor.executeBatch();
deleteExecutor.executeBatch();
reduceBuffer.clear();
}
{code}
executeBatch deletes all false flag data after true flag data.
It means that the UPDATE_BEFORE could be execute after UPDATE_AFTER,and we
would meet data lose because of this.
> Flink JDBC Sink may lose data in left join
> ------------------------------------------
>
> Key: FLINK-24626
> URL: https://issues.apache.org/jira/browse/FLINK-24626
> Project: Flink
> Issue Type: Bug
> Components: Connectors / JDBC
> Reporter: Kenyore
> Priority: Major
>
> The JDBC sink will lose some data while using
> TableBufferReducedStatementExecutor and left join.
> Here are a snippet of executeBatch.
> {code:java}
> @Override
> public void executeBatch() throws SQLException {
> for (Map.Entry<RowData, Tuple2<Boolean, RowData>> entry :
> reduceBuffer.entrySet()) {
> if (entry.getValue().f0) {
> upsertExecutor.addToBatch(entry.getValue().f1);
> } else {
> // delete by key
> deleteExecutor.addToBatch(entry.getKey());
> }
> }
> upsertExecutor.executeBatch();
> deleteExecutor.executeBatch();
> reduceBuffer.clear();
> }
> {code}
> Left join will generate a DETETE row before upsert row and the executeBatch
> will excute them in a wrong order.Whitch may causes data lose.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)