Tomoyuki NAKAMURA created FLINK-39975:
-----------------------------------------
Summary: Flaky DerbyDynamicTableSourceITCase.testLimit:
JdbcSourceSplitReader does not recover from a connection closed during source
cancellation
Key: FLINK-39975
URL: https://issues.apache.org/jira/browse/FLINK-39975
Project: Flink
Issue Type: Bug
Components: Connectors / JDBC
Affects Versions: 2.1.1, 2.0.1
Reporter: Tomoyuki NAKAMURA
h3. Symptom
{{DerbyDynamicTableSourceITCase.testLimit}} (and occasionally {{testProject}})
fails intermittently in CI with:
{code}
java.lang.RuntimeException: java.sql.SQLNonTransientConnectionException: No
current connection.
Caused by: java.sql.SQLNonTransientConnectionException: No current connection.
(Derby ERROR 08003)
at org.apache.derby.impl.jdbc.EmbedConnection.checkIfClosed(...)
at org.apache.derby.impl.jdbc.EmbedConnection.setupContextStack(...)
at org.apache.derby.impl.jdbc.EmbedStatement.executeStatement(...)
at org.apache.derby.impl.jdbc.EmbedPreparedStatement.executeQuery(...)
{code}
The fetcher thread reports the exception, and because the job runs with
{{NoRestartBackoffTimeStrategy}}, the whole job fails. The failure is on the
master branch and is not specific to any one connector PR.
h3. Root cause
{{testLimit}} runs {{SELECT * FROM t LIMIT 1}} over a source partitioned into 2
splits ({{scan.partition.num=2}}). With {{LIMIT 1}} the job completes after the
first row and cancels the source while the split-fetcher thread is still
opening the *second* split. Cancellation tears down the JDBC connection, so the
in-flight {{JdbcSourceSplitReader.openResultSetForSplitWhenAtLeastOnce()}} ->
{{prepareStatement()}}/{{executeQuery()}} runs against an already-closed
connection and Derby raises {{08003: No current connection}}.
There is a validate-then-use window: the connection that
{{SimpleJdbcConnectionProvider.getOrEstablishConnection()}} validated is closed
by the time the reader prepares/executes the statement. The reader does not
recover from this —
the {{SQLException}} is rethrown as a fatal {{RuntimeException}} and fails the
job. The error is connection-level ({{08003}}); the embedded {{memory:}}
database itself is still up (it is only shut down in {{@AfterAll}}), so the
connection can simply be re-established.
Note: the failing stack trace bottoms out at the split-open call
({{prepareStatement}}/{{executeQuery}}), not at {{resultSet.next()}}.
h3. How to reproduce
{code}
mvn test -Dtest=DerbyDynamicTableSourceITCase#testLimit -pl
flink-connector-jdbc-core
{code}
It is timing-dependent; repeat runs surface it. Observed on master
({{af994651}}) and against both Flink 2.0.1 and 2.1.1.
h3. Fix
Make {{JdbcSourceSplitReader}} recover from a connection that was closed while
opening a split, instead of failing the job:
# Wrap the split-open call ({{openResultSetForSplit}}) in a bounded retry
({{openResultSetForSplitWithReconnect}}).
# On a {{SQLException}}, retry *only* when the connection is actually closed
({{connection.isClosed()}}). A genuine query error on a healthy connection is
rethrown immediately.
# Before retrying, drop the dead connection and the statement/result set left
on it (so the provider re-establishes a fresh connection and the close helpers
are not run against the closed connection), then re-open the split.
# Bounded by {{MAX_CONNECTION_RETRIES}} (3); if the database is genuinely
unreachable, re-establishing keeps failing and the error propagates once the
budget is exhausted.
This is non-masking: real failures (a query error on a healthy connection, or
an unreachable database after the retry budget) still fail the job.
{{wakeUp()}} and {{fetch()}} are left unchanged.
h3. Verification
A deterministic regression test injects an already-closed connection and
reproduces the exact {{08003}} error (the test fails without the fix and passes
with it), asserting the reader re-establishes the connection and reads the whole
split. Two more tests cover the immediate-rethrow branch (query error on a
healthy connection) and the retry-exhaustion branch (gives up after the budget
instead of looping forever). {{DerbyDynamicTableSourceITCase}} passes repeated
runs.
h3. Note
An earlier cooperative-cancellation attempt ({{wakeUp}} flag + treating a
shutdown-time {{SQLException}} as a graceful end-of-split in {{fetch()}}) was
discarded: it could mask a genuine mid-read connection drop as a
silently-finished
split, and measurement showed {{wakeUp()}}/thread-interrupt are *not* set when
the race fires — only {{connection.isClosed()}} is reliable. ({{wakeUp()}} is
also invoked by the fetcher for normal {{addSplits}}, so it is not a
shutdown signal.)
h3. Related
A previous attempt (closed without merge) guarded only {{resultSet.isClosed()}}
before {{next()}} / {{extract()}} inside the record loop, which does not cover
the split-open call where this actually fails:
https://github.com/apache/flink-connector-jdbc/pull/191
--
This message was sent by Atlassian Jira
(v8.20.10#820010)