[
https://issues.apache.org/jira/browse/FLINK-30431?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17648307#comment-17648307
]
luoyuxia commented on FLINK-30431:
----------------------------------
Thanks for reporting. I think we should fix it. Otherwise, the reconnection
will never happen.
> JDBC Connector fails to reestablish the lost DB connection.
> ------------------------------------------------------------
>
> Key: FLINK-30431
> URL: https://issues.apache.org/jira/browse/FLINK-30431
> Project: Flink
> Issue Type: Bug
> Components: Connectors / JDBC
> Affects Versions: 1.15.0
> Environment: Flink 1.15.2
> flink-connector-jdbc: 1.15.0
> Reporter: Sai Nadendla
> Priority: Blocker
>
> Our use case with JDBC connector is to sink records to Amazon Redshift DB
> table.
> At some point in time the connection with redshift gets closed and the
> Flink's JDBC connector tries to detect & reestablish the connection in the
> following manner in the @ JdbcOutputFormat.flush() :
> {code:java}
> 1. public synchronized void flush() throws IOException {
> 2. ..
> 3.
> 4. for (int i = 0; i <= executionOptions.getMaxRetries(); i++) {
> 5. try {
> 6. attemptFlush();
> 7. ...
> 8. } catch (SQLException e) {
> 9. ...
> 10. try {
> 11. if (!connectionProvider.isConnectionValid()) { <-- TRUE!
> 12. updateExecutor(true);
> 13. }
> 14. } catch (Exception exception) {
> 15. ....
> 16. throw new IOException("Reestablish JDBC connection failed",
> exception);
> 17. }
> 18. ....
> 19. }
> 20. }
> 21. ....
> 22. }{code}
> updateExecutor() is called (from line#12 of the above code snippet) to close
> statements and re-establish the DB connection.
>
> {code:java}
> 1. public void updateExecutor(boolean reconnect) throws SQLException, ...{
> 2. jdbcStatementExecutor.closeStatements();
> 3. jdbcStatementExecutor.prepareStatements(
> 4. reconnect
> 5. ? connectionProvider.reestablishConnection()
> 6. : connectionProvider.getConnection());
> 7. } {code}
> h4.
> ----
> h3. Results:
> h4. Expected:
> The connection should be re-established and the updates should be reflected
> on DB.
> h4. Actual:
> {color:#de350b}*The {{connection re-establish}} code is never reached/invoked
> !!.*{color} The closeStatements() fails/throws (as the connection is already
> closed).
> {noformat}
> Caused by: com.amazon.redshift.util.RedshiftException: This connection has
> been closed.
> at
> com.amazon.redshift.jdbc.RedshiftConnectionImpl.checkClosed(RedshiftConnectionImpl.java:1095)
> ~[?:?]
> at
> com.amazon.redshift.jdbc.RedshiftConnectionImpl.cancelQuery(RedshiftConnectionImpl.java:1299)
> ~[?:?]
> at
> com.amazon.redshift.jdbc.RedshiftStatementImpl.cancel(RedshiftStatementImpl.java:1042)
> ~[?:?]
> at
> com.amazon.redshift.jdbc.RedshiftStatementImpl.close(RedshiftStatementImpl.java:748)
> ~[?:?]
> at
> org.apache.flink.connector.jdbc.internal.executor.SimpleBatchStatementExecutor.closeStatements(SimpleBatchStatementExecutor.java:81)
> ~[?:?]
> at
> org.apache.flink.connector.jdbc.internal.JdbcOutputFormat.updateExecutor(JdbcOutputFormat.java:402)
> ~[?:?]
> at
> org.apache.flink.connector.jdbc.internal.JdbcOutputFormat.flush(JdbcOutputFormat.java:226)
> ~[?:?]
> at
> org.apache.flink.connector.jdbc.internal.JdbcOutputFormat.lambda$open$0(JdbcOutputFormat.java:155)
> ~[?:?]
> at
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> ~[?:1.8.0_292]
> at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
> ~[?:1.8.0_292]
> at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
> ~[?:1.8.0_292]
> at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
> ~[?:1.8.0_292]
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> ~[?:1.8.0_292]
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> ~[?:1.8.0_292]
> at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_292]
> 2022-11-09 03:00:07,510 INFO
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Stopping
> checkpoint coordinator for job 0f70cbc56798e19978b509bf0da0107b.
> 2022-11-09 03:00:07,517 INFO
> org.apache.flink.runtime.dispatcher.StandaloneDispatcher [] - Job
> 0f70cbc56798e19978b509bf0da0107b reached terminal state FAILED.
> {noformat}
>
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)