[
https://issues.apache.org/jira/browse/FLINK-36649?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Leonard Xu reassigned FLINK-36649:
----------------------------------
Assignee: Di Wu
> Oracle When reading via OracleIncrementalSource, the connection is
> occasionally closed
> --------------------------------------------------------------------------------------
>
> Key: FLINK-36649
> URL: https://issues.apache.org/jira/browse/FLINK-36649
> Project: Flink
> Issue Type: Bug
> Components: Flink CDC
> Affects Versions: cdc-3.2.0, cdc-3.1.1, cdc-3.3.0
> Reporter: Di Wu
> Assignee: Di Wu
> Priority: Major
> Labels: CDC, pull-request-available
>
> Oracle When reading via OracleIncrementalSource, the connection is
> occasionally closed.
>
> {code:java}
> 14:57:56,432 INFO
> org.apache.flink.cdc.connectors.base.source.reader.IncrementalSourceSplitReader
> [Source Data Fetcher for Source: doris_source[1] -> SinkConversion[2] ->
> Sink: Print to Std. Out (1/1)#0] [] - Close snapshot reader
> org.apache.flink.cdc.connectors.base.source.reader.external.IncrementalSourceScanFetcher
> 14:57:56,597 INFO io.debezium.jdbc.JdbcConnection
> [pool-14-thread-1] [] - Connection gracefully closed
> 14:57:56,602 ERROR io.debezium.connector.oracle.logminer.LogMinerHelper
> [debezium-snapshot-reader-0] [] - Mining session stopped due to the
> java.sql.SQLException: 关闭的 Resultset: getLong
> 14:57:56,603 ERROR io.debezium.pipeline.ErrorHandler
> [debezium-snapshot-reader-0] [] - Producer failure
> java.sql.SQLException: 关闭的 Resultset: getLong
> at
> oracle.jdbc.driver.GeneratedScrollableResultSet.getLong(GeneratedScrollableResultSet.java:254)
> ~[ojdbc8-19.3.0.0.jar:19.3.0.0.0]
> at
> io.debezium.connector.oracle.OracleConnection.lambda$getSessionStatisticByName$10(OracleConnection.java:373)
> ~[debezium-connector-oracle-1.9.8.Final.jar:1.9.8.Final]
> at io.debezium.jdbc.JdbcConnection.queryAndMap(JdbcConnection.java:642)
> ~[debezium-core-1.9.8.Final.jar:1.9.8.Final]
> at io.debezium.jdbc.JdbcConnection.queryAndMap(JdbcConnection.java:510)
> ~[debezium-core-1.9.8.Final.jar:1.9.8.Final]
> at
> io.debezium.connector.oracle.OracleConnection.getSessionStatisticByName(OracleConnection.java:372)
> ~[debezium-connector-oracle-1.9.8.Final.jar:1.9.8.Final]
> at
> io.debezium.connector.oracle.logminer.LogMinerStreamingChangeEventSource.captureSessionMemoryStatistics(LogMinerStreamingChangeEventSource.java:353)
> ~[flink-connector-oracle-cdc/:?]
> at
> io.debezium.connector.oracle.logminer.LogMinerStreamingChangeEventSource.execute(LogMinerStreamingChangeEventSource.java:258)
> ~[flink-connector-oracle-cdc/:?]
> at
> org.apache.flink.cdc.connectors.oracle.source.reader.fetch.OracleStreamFetchTask$RedoLogSplitReadTask.execute(OracleStreamFetchTask.java:139)
> ~[flink-connector-oracle-cdc/:?]
> at
> org.apache.flink.cdc.connectors.oracle.source.reader.fetch.OracleScanFetchTask.executeBackfillTask(OracleScanFetchTask.java:106)
> ~[flink-connector-oracle-cdc/:?]
> at
> org.apache.flink.cdc.connectors.base.source.reader.external.AbstractScanFetchTask.execute(AbstractScanFetchTask.java:112)
> ~[flink-cdc-base/:?]
> at
> org.apache.flink.cdc.connectors.base.source.reader.external.IncrementalSourceScanFetcher.lambda$submitTask$1(IncrementalSourceScanFetcher.java:99)
> ~[flink-cdc-base/:?]
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> [?:1.8.0_322]
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> [?:1.8.0_322]
> at java.lang.Thread.run(Thread.java:750) [?:1.8.0_322] {code}
>
> {*}reason{*}:
>
> This is because after split is read, the reader will be closed, at which
> point LogMinerStreamingChangeEventSource will perform
> captureSessionMemoryStatistics to obtain statistical information.
> Finally, in the code
>
> {code:java}
> public <T> T queryAndMap(String query, StatementFactory statementFactory,
> ResultSetMapper<T> mapper) throws SQLException {
> Objects.requireNonNull(mapper, "Mapper must be provided");
> Connection conn = connection(); // Check if the conn is connected
> try (Statement statement = statementFactory.createStatement(conn);) {
> if (LOGGER.isTraceEnabled()) {
> LOGGER.trace("running '{}'", query);
> }
> try (ResultSet resultSet = statement.executeQuery(query);) {
> //When you get here, split executes the close method to close the
> connection, and an error will be reported
> return mapper.apply(resultSet);
> }
> }
> } {code}
>
> *solve:*
> -1. we can regenerate a connection before calling the
> *captureSessionMemoryStatistics(connection)* method, but this will be
> time-consuming. In my local test, it took 6 seconds.-
> 2. Since *captureSessionMemoryStatistics* is just statistical information, I
> think it can be placed before {*}process{*}, so that it can ensure that the
> connection is no longer in use when split reader close
>
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)