[ 
https://issues.apache.org/jira/browse/FLINK-38846?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

张皓然 updated FLINK-38846:
------------------------
    Description: 
shyiko BinaryLogClient uses WriteRowsEventDataDeserializer and 
DeleteRowsEventDataDeserializer to deserialize binlog event produced by INSERT 
or DELETE SQL

Don't know why but both of the deserializer is using *LinkedList* to store 
deserialized rows

 
{code:java}
private List<Serializable[]> deserializeRows(long tableId, BitSet 
includedColumns, ByteArrayInputStream inputStream)
            throws IOException {
    List<Serializable[]> result = new LinkedList<Serializable[]>();
    while (inputStream.available() > 0) {
        result.add(deserializeRow(tableId, includedColumns, inputStream));
    }
    return result;
} {code}
In Debezium MySqlStreamingChangeEventSource, handleChange() uses *Index-based 
for loop traversal* to visit every single row provided by EventDataDeserializer 
ahead
{code:java}
for (int row = startingRowNumber; row != numRows; ++row) {
     offsetContext.setRowNumber(row, numRows);
     offsetContext.event(tableId, eventTimestamp);
     changeEmitter.emit(tableId, rows.get(row));
     count++;
} {code}
It is clear that the time complexity of this traversal is O(n^2), make the cdc 
slower and slower when there is a big transaction

It is recommend to repace LinkedList with ArrayList, I'd like to do this

 

 

  was:
shyiko BinaryLogClient uses WriteRowsEventDataDeserializer and 
DeleteRowsEventDataDeserializer to deserialize binlog event produced by INSERT 
or DELETE SQL

Don't know why but both of the deserializer is using *LinkedList* to store 
deserialized rows

 
{code:java}
private List<Serializable[]> deserializeRows(long tableId, BitSet 
includedColumns, ByteArrayInputStream inputStream)
            throws IOException {
    List<Serializable[]> result = new LinkedList<Serializable[]>();
    while (inputStream.available() > 0) {
        result.add(deserializeRow(tableId, includedColumns, inputStream));
    }
    return result;
} {code}
In Debezium MySqlStreamingChangeEventSource, handleChange() uses *Index-based 
for loop traversal* to visit every single row provided by EventDataDeserializer 
ahead
{code:java}
for (int row = startingRowNumber; row != numRows; ++row) {
     offsetContext.setRowNumber(row, numRows);
     offsetContext.event(tableId, eventTimestamp);
     changeEmitter.emit(tableId, rows.get(row));
     count++;
} {code}
It is clear that the time complexity of this traversal is O(n^2), make the cdc 
slower and slower when there is a big transaction

It is recommend to repace LinkedList with ArrayList and cound, I'd like to do 
this

 

 


> Binlog client use LinkedList to store binlog content
> ----------------------------------------------------
>
>                 Key: FLINK-38846
>                 URL: https://issues.apache.org/jira/browse/FLINK-38846
>             Project: Flink
>          Issue Type: Improvement
>          Components: Flink CDC
>            Reporter: 张皓然
>            Priority: Minor
>              Labels: improvement
>   Original Estimate: 48h
>  Remaining Estimate: 48h
>
> shyiko BinaryLogClient uses WriteRowsEventDataDeserializer and 
> DeleteRowsEventDataDeserializer to deserialize binlog event produced by 
> INSERT or DELETE SQL
> Don't know why but both of the deserializer is using *LinkedList* to store 
> deserialized rows
>  
> {code:java}
> private List<Serializable[]> deserializeRows(long tableId, BitSet 
> includedColumns, ByteArrayInputStream inputStream)
>             throws IOException {
>     List<Serializable[]> result = new LinkedList<Serializable[]>();
>     while (inputStream.available() > 0) {
>         result.add(deserializeRow(tableId, includedColumns, inputStream));
>     }
>     return result;
> } {code}
> In Debezium MySqlStreamingChangeEventSource, handleChange() uses *Index-based 
> for loop traversal* to visit every single row provided by 
> EventDataDeserializer ahead
> {code:java}
> for (int row = startingRowNumber; row != numRows; ++row) {
>      offsetContext.setRowNumber(row, numRows);
>      offsetContext.event(tableId, eventTimestamp);
>      changeEmitter.emit(tableId, rows.get(row));
>      count++;
> } {code}
> It is clear that the time complexity of this traversal is O(n^2), make the 
> cdc slower and slower when there is a big transaction
> It is recommend to repace LinkedList with ArrayList, I'd like to do this
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to