Tomoyuki NAKAMURA created FLINK-39975:
-----------------------------------------

             Summary: Flaky DerbyDynamicTableSourceITCase.testLimit: 
JdbcSourceSplitReader does not recover from a connection closed during source 
cancellation
                 Key: FLINK-39975
                 URL: https://issues.apache.org/jira/browse/FLINK-39975
             Project: Flink
          Issue Type: Bug
          Components: Connectors / JDBC
    Affects Versions: 2.1.1, 2.0.1
            Reporter: Tomoyuki NAKAMURA


h3. Symptom

{{DerbyDynamicTableSourceITCase.testLimit}} (and occasionally {{testProject}}) 
fails intermittently in CI with:

{code}
java.lang.RuntimeException: java.sql.SQLNonTransientConnectionException: No 
current connection.
Caused by: java.sql.SQLNonTransientConnectionException: No current connection.  
 (Derby ERROR 08003)
    at org.apache.derby.impl.jdbc.EmbedConnection.checkIfClosed(...)
    at org.apache.derby.impl.jdbc.EmbedConnection.setupContextStack(...)
    at org.apache.derby.impl.jdbc.EmbedStatement.executeStatement(...)
    at org.apache.derby.impl.jdbc.EmbedPreparedStatement.executeQuery(...)
{code}

The fetcher thread reports the exception, and because the job runs with 
{{NoRestartBackoffTimeStrategy}}, the whole job fails. The failure is on the 
master branch and is not specific to any one connector PR.

h3. Root cause

{{testLimit}} runs {{SELECT * FROM t LIMIT 1}} over a source partitioned into 2 
splits ({{scan.partition.num=2}}). With {{LIMIT 1}} the job completes after the 
first row and cancels the source while the split-fetcher thread is still
opening the *second* split. Cancellation tears down the JDBC connection, so the 
in-flight {{JdbcSourceSplitReader.openResultSetForSplitWhenAtLeastOnce()}} -> 
{{prepareStatement()}}/{{executeQuery()}} runs against an already-closed
connection and Derby raises {{08003: No current connection}}.

There is a validate-then-use window: the connection that 
{{SimpleJdbcConnectionProvider.getOrEstablishConnection()}} validated is closed 
by the time the reader prepares/executes the statement. The reader does not 
recover from this —
the {{SQLException}} is rethrown as a fatal {{RuntimeException}} and fails the 
job. The error is connection-level ({{08003}}); the embedded {{memory:}} 
database itself is still up (it is only shut down in {{@AfterAll}}), so the
connection can simply be re-established.

Note: the failing stack trace bottoms out at the split-open call 
({{prepareStatement}}/{{executeQuery}}), not at {{resultSet.next()}}.

h3. How to reproduce

{code}
mvn test -Dtest=DerbyDynamicTableSourceITCase#testLimit -pl 
flink-connector-jdbc-core
{code}

It is timing-dependent; repeat runs surface it. Observed on master 
({{af994651}}) and against both Flink 2.0.1 and 2.1.1.

h3. Fix

Make {{JdbcSourceSplitReader}} recover from a connection that was closed while 
opening a split, instead of failing the job:
# Wrap the split-open call ({{openResultSetForSplit}}) in a bounded retry 
({{openResultSetForSplitWithReconnect}}).
# On a {{SQLException}}, retry *only* when the connection is actually closed 
({{connection.isClosed()}}). A genuine query error on a healthy connection is 
rethrown immediately.
# Before retrying, drop the dead connection and the statement/result set left 
on it (so the provider re-establishes a fresh connection and the close helpers 
are not run against the closed connection), then re-open the split.
# Bounded by {{MAX_CONNECTION_RETRIES}} (3); if the database is genuinely 
unreachable, re-establishing keeps failing and the error propagates once the 
budget is exhausted.

This is non-masking: real failures (a query error on a healthy connection, or 
an unreachable database after the retry budget) still fail the job. 
{{wakeUp()}} and {{fetch()}} are left unchanged.

h3. Verification

A deterministic regression test injects an already-closed connection and 
reproduces the exact {{08003}} error (the test fails without the fix and passes 
with it), asserting the reader re-establishes the connection and reads the whole
split. Two more tests cover the immediate-rethrow branch (query error on a 
healthy connection) and the retry-exhaustion branch (gives up after the budget 
instead of looping forever). {{DerbyDynamicTableSourceITCase}} passes repeated
runs.

h3. Note

An earlier cooperative-cancellation attempt ({{wakeUp}} flag + treating a 
shutdown-time {{SQLException}} as a graceful end-of-split in {{fetch()}}) was 
discarded: it could mask a genuine mid-read connection drop as a 
silently-finished
split, and measurement showed {{wakeUp()}}/thread-interrupt are *not* set when 
the race fires — only {{connection.isClosed()}} is reliable. ({{wakeUp()}} is 
also invoked by the fetcher for normal {{addSplits}}, so it is not a
shutdown signal.)

h3. Related

A previous attempt (closed without merge) guarded only {{resultSet.isClosed()}} 
before {{next()}} / {{extract()}} inside the record loop, which does not cover 
the split-open call where this actually fails:
https://github.com/apache/flink-connector-jdbc/pull/191




--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to