Flink CDC Issue Import created FLINK-34781:
----------------------------------------------
Summary: [Bug] [oracle] oracle real-time synchronization is slow
Key: FLINK-34781
URL: https://issues.apache.org/jira/browse/FLINK-34781
Project: Flink
Issue Type: Bug
Components: Flink CDC
Reporter: Flink CDC Issue Import
### Search before asking
- [X] I searched in the
[issues|https://github.com/ververica/flink-cdc-connectors/issues) and found
nothing similar.
### Flink version
flink1.15.3
### Flink CDC version
2.3-SNAPSHOT
### Database and its version
oracle 11
### Minimal reproduce step
In oracle real-time synchronization, when the data transmission changes, the
data synchronization is very slow, usually about 10 minutes, especially when
oracle has a large number of libraries and tables, resulting in akka timeout
### What did you expect to see?
StringBuilder queryTablesSql =
new StringBuilder(
"SELECT OWNER ,TABLE_NAME,TABLESPACE_NAME FROM
ALL_TABLES \n"
+ "WHERE TABLESPACE_NAME IS NOT NULL AND
TABLESPACE_NAME NOT IN ('SYSTEM','SYSAUX')");
if (tableList != null && !tableList.isEmpty()) {
StringJoiner stringJoiner = new StringJoiner(",");
for (String tableId : tableList) {
stringJoiner.add("'" + tableId + "'");
}
queryTablesSql
.append(" AND TABLE_NAME IN (")
.append(stringJoiner.toString())
.append(")");
}
try {
jdbcConnection.query(
queryTablesSql.toString(),
rs -> {
while (rs.next()) {
String schemaName = rs.getString(1);
String tableName = rs.getString(2);
TableId tableId =
new TableId(jdbcConnection.database(),
schemaName, tableName);
tableIdSet.add(tableId);
}
});
} catch (SQLException e) {
LOG.warn(" SQL execute error, sql:{}", queryTablesSql, e);
}
### What did you see instead?
String queryTablesSql =
"SELECT OWNER ,TABLE_NAME,TABLESPACE_NAME FROM ALL_TABLES \n"
+ "WHERE TABLESPACE_NAME IS NOT NULL AND
TABLESPACE_NAME NOT IN ('SYSTEM','SYSAUX')";
try {
jdbcConnection.query(
queryTablesSql,
rs -> {
while (rs.next()) {
String schemaName = rs.getString(1);
String tableName = rs.getString(2);
TableId tableId =
new TableId(jdbcConnection.database(),
schemaName, tableName);
tableIdSet.add(tableId);
}
});
} catch (SQLException e) {
LOG.warn(" SQL execute error, sql:{}", queryTablesSql, e];
}
### Anything else?
The limited table name can greatly reduce the query time. In the oraclexia of
my current project, the original query took 1 minute, and the query data amount
was very huge, exceeding 300,000 pieces, which occupied the memory. After
adding the table name qualification condition, the millisecond level query was
performed, and the data amount was small, reducing the memory consumption
### Are you willing to submit a PR?
- [X] I'm willing to submit a PR!
---------------- Imported from GitHub ----------------
Url: https://github.com/apache/flink-cdc/issues/1999
Created by: [weAreFriendYo|https://github.com/weAreFriendYo]
Labels: bug,
Created at: Wed Mar 15 15:05:26 CST 2023
State: open
--
This message was sent by Atlassian Jira
(v8.20.10#820010)