Sai Nadendla created FLINK-30431:
------------------------------------

             Summary: JDBC Connector fails to reestablish the lost DB 
connection. 
                 Key: FLINK-30431
                 URL: https://issues.apache.org/jira/browse/FLINK-30431
             Project: Flink
          Issue Type: Bug
          Components: Connectors / JDBC
    Affects Versions: 1.15.0
         Environment: Flink 1.15.2

flink-connector-jdbc: 1.15.0
            Reporter: Sai Nadendla


Our use case with JDBC connector is to sink records to Amazon Redshift DB table.
At some point in time the connection with redshift gets closed and the Flink's 
JDBC connector tries to detect & reestablish the connection in the following 
manner in the @ JdbcOutputFormat.flush() :
{code:java}
1. public synchronized void flush() throws IOException {
2.    ..
3.    
4.    for (int i = 0; i <= executionOptions.getMaxRetries(); i++) {
5.        try {
6.            attemptFlush();
7.            ...
8.        } catch (SQLException e) {
9.            ...
10.            try {
11.                if (!connectionProvider.isConnectionValid()) { <-- TRUE!
12.                    updateExecutor(true);  
13.                }
14.            } catch (Exception exception) {
15.              ....
16.              throw new IOException("Reestablish JDBC connection failed", 
exception);
17.            }
18.            ....
19.        }
20.     }
21.     ....
22. }{code}
updateExecutor() is called (from line#12 of the above code snippet) to close 
statements and re-establish the DB connection. 

 
{code:java}
1. public void updateExecutor(boolean reconnect) throws SQLException, ...{
2.     jdbcStatementExecutor.closeStatements(); 
3.     jdbcStatementExecutor.prepareStatements(
4.             reconnect
5.                     ? connectionProvider.reestablishConnection()
6.                     : connectionProvider.getConnection());
7. } {code}
h4.  
----
h3. Results:
h4. Expected:

The connection should be re-established and the updates should be reflected on 
DB.
h4. Actual:

{color:#de350b}*The {{connection re-establish}} code is never reached/invoked 
!!.*{color} The closeStatements() fails/throws (as the connection is already 
closed).
{noformat}
Caused by: com.amazon.redshift.util.RedshiftException: This connection has been 
closed.
    at 
com.amazon.redshift.jdbc.RedshiftConnectionImpl.checkClosed(RedshiftConnectionImpl.java:1095)
 ~[?:?]
    at 
com.amazon.redshift.jdbc.RedshiftConnectionImpl.cancelQuery(RedshiftConnectionImpl.java:1299)
 ~[?:?]
    at 
com.amazon.redshift.jdbc.RedshiftStatementImpl.cancel(RedshiftStatementImpl.java:1042)
 ~[?:?]
    at 
com.amazon.redshift.jdbc.RedshiftStatementImpl.close(RedshiftStatementImpl.java:748)
 ~[?:?]
    at 
org.apache.flink.connector.jdbc.internal.executor.SimpleBatchStatementExecutor.closeStatements(SimpleBatchStatementExecutor.java:81)
 ~[?:?]
    at 
org.apache.flink.connector.jdbc.internal.JdbcOutputFormat.updateExecutor(JdbcOutputFormat.java:402)
 ~[?:?]
    at 
org.apache.flink.connector.jdbc.internal.JdbcOutputFormat.flush(JdbcOutputFormat.java:226)
 ~[?:?]
    at 
org.apache.flink.connector.jdbc.internal.JdbcOutputFormat.lambda$open$0(JdbcOutputFormat.java:155)
 ~[?:?]
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) 
~[?:1.8.0_292]
    at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) 
~[?:1.8.0_292]
    at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
 ~[?:1.8.0_292]
    at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
 ~[?:1.8.0_292]
    at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 
~[?:1.8.0_292]
    at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) 
~[?:1.8.0_292]
    at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_292]
2022-11-09 03:00:07,510 INFO  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator    [] - Stopping 
checkpoint coordinator for job 0f70cbc56798e19978b509bf0da0107b.
2022-11-09 03:00:07,517 INFO  
org.apache.flink.runtime.dispatcher.StandaloneDispatcher     [] - Job 
0f70cbc56798e19978b509bf0da0107b reached terminal state FAILED.
{noformat}




 

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to