[
https://issues.apache.org/jira/browse/FLINK-39975?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Yuepeng Pan updated FLINK-39975:
--------------------------------
Fix Version/s: jdbc-3.4.0
jdbc-5.0.0
jdbc-6.0.0
> Flaky DerbyDynamicTableSourceITCase.testLimit: JdbcSourceSplitReader does not
> recover from a connection closed during source cancellation
> -----------------------------------------------------------------------------------------------------------------------------------------
>
> Key: FLINK-39975
> URL: https://issues.apache.org/jira/browse/FLINK-39975
> Project: Flink
> Issue Type: Bug
> Components: Connectors / JDBC
> Affects Versions: 2.0.1, 2.1.1
> Reporter: Tomoyuki NAKAMURA
> Priority: Critical
> Labels: pull-request-available, test-stability
> Fix For: jdbc-3.4.0, jdbc-5.0.0, jdbc-6.0.0
>
>
> h3. Symptom
> {{DerbyDynamicTableSourceITCase.testLimit}} (and occasionally
> {{testProject}}) fails intermittently in CI with:
> {code}
> java.lang.RuntimeException: java.sql.SQLNonTransientConnectionException: No
> current connection.
> Caused by: java.sql.SQLNonTransientConnectionException: No current
> connection. (Derby ERROR 08003)
> at org.apache.derby.impl.jdbc.EmbedConnection.checkIfClosed(...)
> at org.apache.derby.impl.jdbc.EmbedConnection.setupContextStack(...)
> at org.apache.derby.impl.jdbc.EmbedStatement.executeStatement(...)
> at org.apache.derby.impl.jdbc.EmbedPreparedStatement.executeQuery(...)
> {code}
> The fetcher thread reports the exception, and because the job runs with
> {{NoRestartBackoffTimeStrategy}}, the whole job fails. The failure is on the
> master branch and is not specific to any one connector PR.
> h3. Root cause
> {{testLimit}} runs {{SELECT * FROM t LIMIT 1}} over a source partitioned into
> 2 splits ({{scan.partition.num=2}}). With {{LIMIT 1}} the job completes after
> the first row and cancels the source while the split-fetcher thread is still
> opening the *second* split. Cancellation tears down the JDBC connection, so
> the in-flight
> {{JdbcSourceSplitReader.openResultSetForSplitWhenAtLeastOnce()}} ->
> {{prepareStatement()}}/{{executeQuery()}} runs against an already-closed
> connection and Derby raises {{08003: No current connection}}.
> There is a validate-then-use window: the connection that
> {{SimpleJdbcConnectionProvider.getOrEstablishConnection()}} validated is
> closed by the time the reader prepares/executes the statement. The reader
> does not recover from this —
> the {{SQLException}} is rethrown as a fatal {{RuntimeException}} and fails
> the job. The error is connection-level ({{08003}}); the embedded {{memory:}}
> database itself is still up (it is only shut down in {{@AfterAll}}), so the
> connection can simply be re-established.
> Note: the failing stack trace bottoms out at the split-open call
> ({{prepareStatement}}/{{executeQuery}}), not at {{resultSet.next()}}.
> h3. How to reproduce
> {code}
> mvn test -Dtest=DerbyDynamicTableSourceITCase#testLimit -pl
> flink-connector-jdbc-core
> {code}
> It is timing-dependent; repeat runs surface it. Observed on master
> ({{af994651}}) and against both Flink 2.0.1 and 2.1.1.
> h3. Fix
> Make {{JdbcSourceSplitReader}} recover from a connection that was closed
> while opening a split, instead of failing the job:
> # Wrap the split-open call ({{openResultSetForSplit}}) in a bounded retry
> ({{openResultSetForSplitWithReconnect}}).
> # On a {{SQLException}}, retry *only* when the connection is actually closed
> ({{connection.isClosed()}}). A genuine query error on a healthy connection is
> rethrown immediately.
> # Before retrying, drop the dead connection and the statement/result set left
> on it (so the provider re-establishes a fresh connection and the close
> helpers are not run against the closed connection), then re-open the split.
> # Bounded by {{MAX_CONNECTION_RETRIES}} (3); if the database is genuinely
> unreachable, re-establishing keeps failing and the error propagates once the
> budget is exhausted.
> This is non-masking: real failures (a query error on a healthy connection, or
> an unreachable database after the retry budget) still fail the job.
> {{wakeUp()}} and {{fetch()}} are left unchanged.
> h3. Verification
> A deterministic regression test injects an already-closed connection and
> reproduces the exact {{08003}} error (the test fails without the fix and
> passes with it), asserting the reader re-establishes the connection and reads
> the whole
> split. Two more tests cover the immediate-rethrow branch (query error on a
> healthy connection) and the retry-exhaustion branch (gives up after the
> budget instead of looping forever). {{DerbyDynamicTableSourceITCase}} passes
> repeated
> runs.
> h3. Note
> An earlier cooperative-cancellation attempt ({{wakeUp}} flag + treating a
> shutdown-time {{SQLException}} as a graceful end-of-split in {{fetch()}}) was
> discarded: it could mask a genuine mid-read connection drop as a
> silently-finished
> split, and measurement showed {{wakeUp()}}/thread-interrupt are *not* set
> when the race fires — only {{connection.isClosed()}} is reliable.
> ({{wakeUp()}} is also invoked by the fetcher for normal {{addSplits}}, so it
> is not a
> shutdown signal.)
> h3. Related
> A previous attempt (closed without merge) guarded only
> {{resultSet.isClosed()}} before {{next()}} / {{extract()}} inside the record
> loop, which does not cover the split-open call where this actually fails:
> https://github.com/apache/flink-connector-jdbc/pull/191
--
This message was sent by Atlassian Jira
(v8.20.10#820010)