[ 
https://issues.apache.org/jira/browse/FLINK-21743?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17315273#comment-17315273
 ] 

Yuan Mei edited comment on FLINK-21743 at 4/6/21, 6:51 AM:
-----------------------------------------------------------

Hey [~weihao], I've tried on the latest version of MySQL (8.0), the same 
problem (only one global transaction is supported as you reported) occurs. 
>From their documentation, https://dev.mysql.com/doc/refman/8.0/en/xa.html, it 
>seems MySQL only has "a" global transaction (XA trans) at a time, and it does 
>not consider it as a limitation/restriction: 
>https://dev.mysql.com/doc/refman/8.0/en/xa-restrictions.html.

One global transaction is a problem for `JdbcXaSinkFunction` because a 
transaction enters a prepared state when checkpointing (snapshotting), the same 
transaction commits/aborts when getting notification. The time between 
checkpointing and getting a notification is unpredictable, and
the notification is best-effort and not guaranteed.

We've discussed offline to solve this problem and will update here once we have 
more concrete solutions/conclusions. At the same time, I think you probably 
could set 

`ExecutionCheckpointingOptions.MAX_CONCURRENT_CHECKPOINTS = 1` and enlarge
`ExecutionCheckpointingOptions.MIN_PAUSE_BETWEEN_CHECKPOINTS` (couple of 
seconds for example) to mitigate the issue?

Your pipeline would failover if the second trans begins without the first one 
commits/aborts, but won't affect data correctness.


was (Author: ym):
Hey [~weihao], I've tried on the latest version of MySQL (8.0), the same 
problem (only one global transaction is supported as you reported) occurs. 
>From their documentation, https://dev.mysql.com/doc/refman/5.7/en/xa.html, it 
>seems MySQL only has "a" global transaction (XA trans) at a time, and it does 
>not consider it as a limitation/restriction: 
>https://dev.mysql.com/doc/refman/8.0/en/xa-restrictions.html.

One global transaction is a problem for `JdbcXaSinkFunction` because a 
transaction enters a prepared state when checkpointing (snapshotting), the same 
transaction commits/aborts when getting notification. The time between 
checkpointing and getting a notification is unpredictable, and
the notification is best-effort and not guaranteed.

We've discussed offline to solve this problem and will update here once we have 
more concrete solutions/conclusions. At the same time, I think you probably 
could set 

`ExecutionCheckpointingOptions.MAX_CONCURRENT_CHECKPOINTS = 1` and enlarge
`ExecutionCheckpointingOptions.MIN_PAUSE_BETWEEN_CHECKPOINTS` (couple of 
seconds for example) to mitigate the issue?

Your pipeline would failover if the second trans begins without the first one 
commits/aborts, but won't affect data correctness.

> JdbcXaSinkFunction throws XAER_RMFAIL when calling snapshotState and beginTx
> ----------------------------------------------------------------------------
>
>                 Key: FLINK-21743
>                 URL: https://issues.apache.org/jira/browse/FLINK-21743
>             Project: Flink
>          Issue Type: Test
>          Components: Connectors / JDBC
>    Affects Versions: 1.13.0
>         Environment: org.apache.flink:flink-streaming-java_2.11:1.12.1
> org.apache.flink:flink-connector-jdbc_2.11:1.13-SNAPSHOT
>            Reporter: Wei Hao
>            Assignee: Roman Khachatryan
>            Priority: Major
>
> {code:java}
> public void snapshotState(FunctionSnapshotContext context) throws Exception {
>     LOG.debug("snapshot state, checkpointId={}", context.getCheckpointId());
>     this.rollbackPreparedFromCheckpoint(context.getCheckpointId());
>     this.prepareCurrentTx(context.getCheckpointId());
>     this.beginTx(context.getCheckpointId() + 1L);
>     this.stateHandler.store(JdbcXaSinkFunctionState.of(this.preparedXids, 
> this.hangingXids));
> }
> {code}
> When checkpointing starts, it calls snapshotState(), which ends and prepares 
> the current transaction. The issue I found is with beginTx(), where a new Xid 
> is generated and xaFacade will run command like 'xa start new_xid', which 
> will throw the exception as shown below and causes checkpointing failure.
> {code:java}
> Caused by: org.apache.flink.connector.jdbc.xa.XaFacade$TransientXaException: 
> com.mysql.cj.jdbc.MysqlXAException: XAER_RMFAIL: The command cannot be 
> executed when global transaction is in the  PREPARED stateCaused by: 
> org.apache.flink.connector.jdbc.xa.XaFacade$TransientXaException: 
> com.mysql.cj.jdbc.MysqlXAException: XAER_RMFAIL: The command cannot be 
> executed when global transaction is in the  PREPARED state at 
> org.apache.flink.connector.jdbc.xa.XaFacadeImpl.wrapException(XaFacadeImpl.java:353)
>  at 
> org.apache.flink.connector.jdbc.xa.XaFacadeImpl.access$800(XaFacadeImpl.java:66)
>  at 
> org.apache.flink.connector.jdbc.xa.XaFacadeImpl$Command.lambda$fromRunnable$0(XaFacadeImpl.java:288)
>  at 
> org.apache.flink.connector.jdbc.xa.XaFacadeImpl$Command.lambda$fromRunnable$4(XaFacadeImpl.java:327)
>  at 
> org.apache.flink.connector.jdbc.xa.XaFacadeImpl.execute(XaFacadeImpl.java:267)
>  at 
> org.apache.flink.connector.jdbc.xa.XaFacadeImpl.start(XaFacadeImpl.java:160) 
> at 
> org.apache.flink.connector.jdbc.xa.JdbcXaSinkFunction.beginTx(JdbcXaSinkFunction.java:302)
>  at 
> org.apache.flink.connector.jdbc.xa.JdbcXaSinkFunction.snapshotState(JdbcXaSinkFunction.java:241)
>  at 
> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.trySnapshotFunctionState(StreamingFunctionUtils.java:118)
>  at 
> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.snapshotFunctionState(StreamingFunctionUtils.java:99)
>  at 
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.snapshotState(AbstractUdfStreamOperator.java:89)
>  at 
> org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:205)
>  ... 23 more
> {code}
> I think the scenario is quite predictable because it is how xa transaction 
> works.
> The MySQL shell example below behaves quite similar to how JdbcXaSinkFunction 
> works.
> {code:java}
> xa start "1111";
> # Inserting some rows
> # end the current transaction
> xa end "1111";
> xa prepare "1111";
> # start a new transaction with the same connection while the previous one is 
> PREPARED
> xa prepare "2222";
> {code}
> This also produces error 'SQL Error [1399] [XAE07]: XAER_RMFAIL: The command 
> cannot be executed when global transaction is in the PREPARED state'.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to