[
https://issues.apache.org/jira/browse/FLINK-36793?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17904713#comment-17904713
]
林健昌 commented on FLINK-36793:
-----------------------------
Can you assign this issue to me to solve?
The reason for this problem is that the comparison result of rowid in Oracle
SQL and the comparison result of the ROWID.compareBytes() method in the code
will be different, causing the split shard to be mistaken for the last shard.
Using the last shard may be very large.
{code:java}
//原来的代码
/** ChunkEnd greater than or equal to max. */
private boolean isChunkEndGeMax(Object chunkEnd, Object max) {
boolean chunkEndMaxCompare;
if (chunkEnd instanceof ROWID && max instanceof ROWID) {
chunkEndMaxCompare =
ROWID.compareBytes(((ROWID) chunkEnd).getBytes(), ((ROWID)
max).getBytes())
>= 0;
} else {
chunkEndMaxCompare = chunkEnd != null && ObjectUtils.compare(chunkEnd,
max) >= 0;
}
return chunkEndMaxCompare;
} {code}
{code:java}
//修复后的代码
/** ChunkEnd greater than or equal to max. */
private boolean isChunkEndGeMax(JdbcConnection jdbc, Object chunkEnd, Object
max)
throws SQLException {
boolean chunkEndMaxCompare;
if (chunkEnd instanceof ROWID && max instanceof ROWID) {
String query =
String.format(
"SELECT CHARTOROWID(?) ROWIDS FROM DUAL UNION SELECT
CHARTOROWID(?) ROWIDS FROM DUAL ORDER BY ROWIDS DESC");
return jdbc.prepareQueryAndMap(
query,
ps -> {
ps.setObject(1, chunkEnd.toString());
ps.setObject(2, max.toString());
},
rs -> {
if (rs.next()) {
Object obj = rs.getObject(1);
return obj.toString().equals(chunkEnd.toString())
|| chunkEnd.toString().equals(max.toString());
} else {
throw new RuntimeException("compare rowid error");
}
});
} else {
chunkEndMaxCompare = chunkEnd != null && ObjectUtils.compare(chunkEnd,
max) >= 0;
}
return chunkEndMaxCompare;
} {code}
> There is a problem with the block splitter logic of Oracle CDC incremental
> snapshot, causing the split slice to be too large.
> -----------------------------------------------------------------------------------------------------------------------------
>
> Key: FLINK-36793
> URL: https://issues.apache.org/jira/browse/FLINK-36793
> Project: Flink
> Issue Type: Improvement
> Components: Flink CDC
> Reporter: 林健昌
> Priority: Minor
> Original Estimate: 12h
> Remaining Estimate: 12h
>
> When Oracle takes incremental snapshots, there is a problem with the logic of
> ChunkSplitter to fragment based on rowId, resulting in a very large fragment,
> which will cause taskmanager memory overflow.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)