tangshangwen created FLINK-16708:
------------------------------------
Summary: When a JDBC connection has been closed, the retry policy
of the JDBCUpsertOutputFormat cannot take effect and may result in data loss
Key: FLINK-16708
URL: https://issues.apache.org/jira/browse/FLINK-16708
Project: Flink
Issue Type: Bug
Components: Connectors / JDBC
Affects Versions: 1.10.0
Reporter: tangshangwen
In our test environment, I used the tcpkill command to simulate a scenario
where the postgresql connection was closed. I found that the retry strategy of
the flush method did not take effect, and when it retried the second time, it
could not recognize that the connection had been closed because Before the
first check whether the connection is closed, the batchStatements of
PgStatement have been cleared, which causes the second execution to check that
the batchStatements are empty and return normally.
{code:java}
2020-03-20 21:16:18.246 [jdbc-upsert-output-format-thread-1] ERROR
org.apache.flink.api.java.io.jdbc.JDBCUpsertOutputFormat - JDBC executeBatch
error, retry times = 1
org.postgresql.util.PSQLException: This connection has been closed.
at org.postgresql.jdbc.PgConnection.checkClosed(PgConnection.java:857)
at
org.postgresql.jdbc.PgConnection.getAutoCommit(PgConnection.java:817)
at
org.postgresql.jdbc.PgStatement.internalExecuteBatch(PgStatement.java:813)
at org.postgresql.jdbc.PgStatement.executeBatch(PgStatement.java:873)
at
org.postgresql.jdbc.PgPreparedStatement.executeBatch(PgPreparedStatement.java:1569)
at
org.apache.flink.api.java.io.jdbc.writer.AppendOnlyWriter.executeBatch(AppendOnlyWriter.java:62)
at
org.apache.flink.api.java.io.jdbc.JDBCUpsertOutputFormat.flush(JDBCUpsertOutputFormat.java:159)
at
org.apache.flink.api.java.io.jdbc.JDBCUpsertOutputFormat.lambda$open$0(JDBCUpsertOutputFormat.java:124)
at
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
2020-03-20 21:16:21.247 [jdbc-upsert-output-format-thread-1] ERROR
org.apache.flink.api.java.io.jdbc.JDBCUpsertOutputFormat - JDBC executeBatch
error, retry times = 1
org.postgresql.util.PSQLException: This connection has been closed.
at org.postgresql.jdbc.PgConnection.checkClosed(PgConnection.java:857)
at
org.postgresql.jdbc.PgConnection.getAutoCommit(PgConnection.java:817)
at
org.postgresql.jdbc.PgStatement.internalExecuteBatch(PgStatement.java:813)
at org.postgresql.jdbc.PgStatement.executeBatch(PgStatement.java:873)
at
org.postgresql.jdbc.PgPreparedStatement.executeBatch(PgPreparedStatement.java:1569)
at
org.apache.flink.api.java.io.jdbc.writer.AppendOnlyWriter.executeBatch(AppendOnlyWriter.java:62)
at
org.apache.flink.api.java.io.jdbc.JDBCUpsertOutputFormat.flush(JDBCUpsertOutputFormat.java:159)
at
org.apache.flink.api.java.io.jdbc.JDBCUpsertOutputFormat.lambda$open$0(JDBCUpsertOutputFormat.java:124)
at
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
{code}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)