Well, I see. If the connection is established when writing data into DB, we need to cache received rows since last write.
IMO, maybe we do not need to open connections repeatedly or introduce connection pools. Test and refresh the connection periodically can simply solve this problem. I’ve implemented this at https://github.com/apache/flink/pull/6301 <https://github.com/apache/flink/pull/6301>, It would be kind of you to review this. Best, wangsan > On Jul 11, 2018, at 2:25 PM, Hequn Cheng <chenghe...@gmail.com> wrote: > > Hi wangsan, > > What I mean is establishing a connection each time write data into JDBC, > i.e. establish a connection in flush() function. I think this will make > sure the connection is ok. What do you think? > > On Wed, Jul 11, 2018 at 12:12 AM, wangsan <wamg...@163.com > <mailto:wamg...@163.com>> wrote: > >> Hi Hequn, >> >> Establishing a connection for each batch write may also have idle >> connection problem, since we are not sure when the connection will be >> closed. We call flush() method when a batch is finished or snapshot state, >> but what if the snapshot is not enabled and the batch size not reached >> before the connection is closed? >> >> May be we could use a Timer to test the connection periodically and keep >> it alive. What do you think? >> >> I will open a jira and try to work on that issue. >> >> Best, >> wangsan >> >> >> >> On Jul 10, 2018, at 8:38 PM, Hequn Cheng <chenghe...@gmail.com> wrote: >> >> Hi wangsan, >> >> I agree with you. It would be kind of you to open a jira to check the >> problem. >> >> For the first problem, I think we need to establish connection each time >> execute batch write. And, it is better to get the connection from a >> connection pool. >> For the second problem, to avoid multithread problem, I think we should >> synchronized the batch object in flush() method. >> >> What do you think? >> >> Best, Hequn >> >> >> >> On Tue, Jul 10, 2018 at 2:36 PM, wangsan <wamg...@163.com >> <mailto:wamg...@163.com>> wrote: >> >>> Hi all， >>> >>> I'm going to use JDBCAppendTableSink and JDBCOutputFormat in my Flink >>> application. But I am confused with the implementation of JDBCOutputFormat. >>> >>> 1. The Connection was established when JDBCOutputFormat is opened, and >>> will be used all the time. But if this connction lies idle for a long time, >>> the database will force close the connetion, thus errors may occur. >>> 2. The flush() method is called when batchCount exceeds the threshold, >>> but it is also called while snapshotting state. So two threads may modify >>> upload and batchCount, but without synchronization. >>> >>> Please correct me if I am wrong. >>> >>> —— >>> wangsan