[ 
https://issues.apache.org/jira/browse/FLINK-20972?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17265802#comment-17265802
 ] 

Yun Gao commented on FLINK-20972:
---------------------------------

And one more point, might be unrelated to this issue, is that to implement an 
exactly-once sink to database, use ordinary jdbc transaction is not enough, 
since generally jdbc transaction will be aborted on connection close, thus once 
there are failover, the connection to the database would be closed and all the 
pre-committed jdbc transaction would be lost. For implementation of jdbc 
excactly-once sink, XA transaction might be required (and the level of support 
for different database need to be checked). More information are available in 
[This 
discussion|http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-JDBC-exactly-once-sink-td36424.html#a36431]
 and [This PR|https://github.com/apache/flink/pull/10847/files].

> TwoPhaseCommitSinkFunction Output a large amount of EventData
> -------------------------------------------------------------
>
>                 Key: FLINK-20972
>                 URL: https://issues.apache.org/jira/browse/FLINK-20972
>             Project: Flink
>          Issue Type: Improvement
>          Components: API / DataStream
>    Affects Versions: 1.12.0
>         Environment: flink 1.4.0 +
>            Reporter: huajiewang
>            Priority: Minor
>              Labels: easyfix, pull-request-available
>         Attachments: 1610682498960.jpg, 1610682603148.jpg, 
> Jdbc2PCSinkFunction.scala
>
>   Original Estimate: 1h
>  Remaining Estimate: 1h
>
> in TwoPhaseCommitSinkFunctionOutput Maybe A large number of EventData will be 
> output(log.info),which will cause IO bottleneck and disk waste.
>  
>  my code in the attachment, A large number event data output in the log 
> output by flink , e.g: 
> {code:java}
> Jdbc2PCSinkFunction 1/1 - checkpoint 4 complete, committing transaction 
> TransactionHolde {handle=Transaction(b420c880a951403984f231dd7e33597b, 
> ListBuffer(insert into table(field1,field2) value ('11','22') ... ... ), 
> transactionStartTime=1610426158532} from checkpoint 4{code}
> in TwoPhaseCommitSinkFunction about LOG.info code is as follows:
> !1610682498960.jpg|width=838,height=630!
> {code:java}
> LOG.info(
>         "{} - checkpoint {} complete, committing transaction {} from 
> checkpoint {}",
>         name(),
>         checkpointId,
>         pendingTransaction,
>         pendingTransactionCheckpointId); {code}
> will be invoke pendingTransaction'toString method (pendingTransaction is 
> TransactionHolder'instance) 
> TransactionHolder'toString method code is:
> !1610682603148.jpg|width=859,height=327!
> {code:java}
> @Override
> public String toString() {
>     return "TransactionHolder{"
>             + "handle="
>             +  handle
>             + ", transactionStartTime="
>             + transactionStartTime
>             + '}';
> }{code}
>  handle is the concrete realization of my Transaction! There is a parameter 
> of List type in my Transaction, which is used to receive data. as a result, 
> these data are printed out(log.info)
>   
>   
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to