Sai Nadendla created FLINK-30431:
------------------------------------
Summary: JDBC Connector fails to reestablish the lost DB
connection.
Key: FLINK-30431
URL: https://issues.apache.org/jira/browse/FLINK-30431
Project: Flink
Issue Type: Bug
Components: Connectors / JDBC
Affects Versions: 1.15.0
Environment: Flink 1.15.2
flink-connector-jdbc: 1.15.0
Reporter: Sai Nadendla
Our use case with JDBC connector is to sink records to Amazon Redshift DB table.
At some point in time the connection with redshift gets closed and the Flink's
JDBC connector tries to detect & reestablish the connection in the following
manner in the @ JdbcOutputFormat.flush() :
{code:java}
1. public synchronized void flush() throws IOException {
2. ..
3.
4. for (int i = 0; i <= executionOptions.getMaxRetries(); i++) {
5. try {
6. attemptFlush();
7. ...
8. } catch (SQLException e) {
9. ...
10. try {
11. if (!connectionProvider.isConnectionValid()) { <-- TRUE!
12. updateExecutor(true);
13. }
14. } catch (Exception exception) {
15. ....
16. throw new IOException("Reestablish JDBC connection failed",
exception);
17. }
18. ....
19. }
20. }
21. ....
22. }{code}
updateExecutor() is called (from line#12 of the above code snippet) to close
statements and re-establish the DB connection.
{code:java}
1. public void updateExecutor(boolean reconnect) throws SQLException, ...{
2. jdbcStatementExecutor.closeStatements();
3. jdbcStatementExecutor.prepareStatements(
4. reconnect
5. ? connectionProvider.reestablishConnection()
6. : connectionProvider.getConnection());
7. } {code}
h4.
----
h3. Results:
h4. Expected:
The connection should be re-established and the updates should be reflected on
DB.
h4. Actual:
{color:#de350b}*The {{connection re-establish}} code is never reached/invoked
!!.*{color} The closeStatements() fails/throws (as the connection is already
closed).
{noformat}
Caused by: com.amazon.redshift.util.RedshiftException: This connection has been
closed.
at
com.amazon.redshift.jdbc.RedshiftConnectionImpl.checkClosed(RedshiftConnectionImpl.java:1095)
~[?:?]
at
com.amazon.redshift.jdbc.RedshiftConnectionImpl.cancelQuery(RedshiftConnectionImpl.java:1299)
~[?:?]
at
com.amazon.redshift.jdbc.RedshiftStatementImpl.cancel(RedshiftStatementImpl.java:1042)
~[?:?]
at
com.amazon.redshift.jdbc.RedshiftStatementImpl.close(RedshiftStatementImpl.java:748)
~[?:?]
at
org.apache.flink.connector.jdbc.internal.executor.SimpleBatchStatementExecutor.closeStatements(SimpleBatchStatementExecutor.java:81)
~[?:?]
at
org.apache.flink.connector.jdbc.internal.JdbcOutputFormat.updateExecutor(JdbcOutputFormat.java:402)
~[?:?]
at
org.apache.flink.connector.jdbc.internal.JdbcOutputFormat.flush(JdbcOutputFormat.java:226)
~[?:?]
at
org.apache.flink.connector.jdbc.internal.JdbcOutputFormat.lambda$open$0(JdbcOutputFormat.java:155)
~[?:?]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
~[?:1.8.0_292]
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
~[?:1.8.0_292]
at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
~[?:1.8.0_292]
at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
~[?:1.8.0_292]
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
~[?:1.8.0_292]
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
~[?:1.8.0_292]
at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_292]
2022-11-09 03:00:07,510 INFO
org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Stopping
checkpoint coordinator for job 0f70cbc56798e19978b509bf0da0107b.
2022-11-09 03:00:07,517 INFO
org.apache.flink.runtime.dispatcher.StandaloneDispatcher [] - Job
0f70cbc56798e19978b509bf0da0107b reached terminal state FAILED.
{noformat}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)