[
https://issues.apache.org/jira/browse/FLINK-12494?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Flink Jira Bot updated FLINK-12494:
-----------------------------------
Labels: auto-unassigned pull-request-available stale-major (was:
auto-unassigned pull-request-available)
I am the [Flink Jira Bot|https://github.com/apache/flink-jira-bot/] and I help
the community manage its development. I see this issues has been marked as
Major but is unassigned and neither itself nor its Sub-Tasks have been updated
for 30 days. I have gone ahead and added a "stale-major" to the issue". If this
ticket is a Major, please either assign yourself or give an update. Afterwards,
please remove the label or in 7 days the issue will be deprioritized.
> JDBCOutputFormat support reconnect when link failure and flush by timeInterval
> ------------------------------------------------------------------------------
>
> Key: FLINK-12494
> URL: https://issues.apache.org/jira/browse/FLINK-12494
> Project: Flink
> Issue Type: Bug
> Components: Connectors / JDBC
> Affects Versions: 1.8.0
> Reporter: zzsmdfj
> Priority: Major
> Labels: auto-unassigned, pull-request-available, stale-major
> Time Spent: 0.5h
> Remaining Estimate: 0h
>
> when i JDBCSink(flink-1.4.2) wite recode to mysql,find exception as flow :
>
> {code:java}
> java.util.concurrent.ExecutionException:
> com.mysql.jdbc.exceptions.jdbc4.CommunicationsException: Communications link
> failure
> The last packet successfully received from the server was 265,251
> milliseconds ago. The last packet sent successfully to the server was 265,252
> milliseconds ago.
> at
> java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
> at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)
> at
> org.apache.flink.streaming.api.operators.async.queue.StreamRecordQueueEntry.get(StreamRecordQueueEntry.java:68)
> at
> org.apache.flink.streaming.api.operators.async.Emitter.output(Emitter.java:129)
> ... 2 more
> Caused by: com.mysql.jdbc.exceptions.jdbc4.CommunicationsException:
> Communications link failure
> The last packet successfully received from the server was 265,251
> milliseconds ago. The last packet sent successfully to the server was 265,252
> milliseconds ago.
> at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
> at
> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
> at
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
> at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
> at com.mysql.jdbc.Util.handleNewInstance(Util.java:411)
> at com.mysql.jdbc.SQLError.createCommunicationsException(SQLError.java:1116)
> at com.mysql.jdbc.MysqlIO.send(MysqlIO.java:3364)
> at com.mysql.jdbc.MysqlIO.sendCommand(MysqlIO.java:1983)
> at com.mysql.jdbc.MysqlIO.sqlQueryDirect(MysqlIO.java:2163)
> at com.mysql.jdbc.ConnectionImpl.execSQL(ConnectionImpl.java:2624)
> at
> com.mysql.jdbc.PreparedStatement.executeInternal(PreparedStatement.java:2127)
> at com.mysql.jdbc.PreparedStatement.executeQuery(PreparedStatement.java:2293)
> at
> org.apache.flink.table.plan.nodes.datastream.dimension.JDBCDimensionTableFunction.executeQuery(JDBCDimensionTableFunction.scala:199)
> at
> org.apache.flink.table.plan.nodes.datastream.dimension.JDBCDimensionTableFunction.executeQueryAndCombine(JDBCDimensionTableFunction.scala:139)
> at
> org.apache.flink.table.plan.nodes.datastream.dimension.JDBCDimensionTableFunction$$anon$1.apply(JDBCDimensionTableFunction.scala:83)
> at
> org.apache.flink.table.plan.nodes.datastream.dimension.JDBCDimensionTableFunction$$anon$1.apply(JDBCDimensionTableFunction.scala:73)
> at
> org.apache.flink.streaming.api.functions.async.DimensionTableJoinFunction.lambda$asyncInvoke$0(DimensionTableJoinFunction.java:105)
> at akka.dispatch.Futures$$anonfun$future$1.apply(Future.scala:97)
> at
> scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
> at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> ... 1 more
> Caused by: java.net.SocketException: Connection reset
> at java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:113)
> at java.net.SocketOutputStream.write(SocketOutputStream.java:153)
> at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82)
> at java.io.BufferedOutputStream.flush(BufferedOutputStream.java:140)
> at com.mysql.jdbc.MysqlIO.send(MysqlIO.java:3345)
> ... 16 more
> {code}
> i think it is too long not write record by connection(idleConnection),server
> close connection initiative. sparse data is relatively common in fact, so i
> think we should reconnect when then connection is invalid。
> besides,i find JDBCOutputFormat.flush only call by snapshotState method and
> "batchCount >= batchInterval",also if ours sink records is sparse, we will
> find actual write happended by very large time delay,so should we add a flush
> condition:currentTime- lastFlushTime > timeInterval?
>
--
This message was sent by Atlassian Jira
(v8.3.4#803005)