[
https://issues.apache.org/jira/browse/FLINK-21743?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Roman Khachatryan reassigned FLINK-21743:
-----------------------------------------
Assignee: Roman Khachatryan
> JdbcXaSinkFunction throws XAER_RMFAIL when calling snapshotState and beginTx
> ----------------------------------------------------------------------------
>
> Key: FLINK-21743
> URL: https://issues.apache.org/jira/browse/FLINK-21743
> Project: Flink
> Issue Type: Test
> Components: Connectors / JDBC
> Affects Versions: 1.13.0
> Environment: org.apache.flink:flink-streaming-java_2.11:1.12.1
> org.apache.flink:flink-connector-jdbc_2.11:1.13-SNAPSHOT
> Reporter: Wei Hao
> Assignee: Roman Khachatryan
> Priority: Major
>
> {code:java}
> public void snapshotState(FunctionSnapshotContext context) throws Exception {
> LOG.debug("snapshot state, checkpointId={}", context.getCheckpointId());
> this.rollbackPreparedFromCheckpoint(context.getCheckpointId());
> this.prepareCurrentTx(context.getCheckpointId());
> this.beginTx(context.getCheckpointId() + 1L);
> this.stateHandler.store(JdbcXaSinkFunctionState.of(this.preparedXids,
> this.hangingXids));
> }
> {code}
> When checkpointing starts, it calls snapshotState(), which ends and prepares
> the current transaction. The issue I found is with beginTx(), where a new Xid
> is generated and xaFacade will run command like 'xa start new_xid', which
> will throw the exception as shown below and causes checkpointing failure.
> {code:java}
> Caused by: org.apache.flink.connector.jdbc.xa.XaFacade$TransientXaException:
> com.mysql.cj.jdbc.MysqlXAException: XAER_RMFAIL: The command cannot be
> executed when global transaction is in the PREPARED stateCaused by:
> org.apache.flink.connector.jdbc.xa.XaFacade$TransientXaException:
> com.mysql.cj.jdbc.MysqlXAException: XAER_RMFAIL: The command cannot be
> executed when global transaction is in the PREPARED state at
> org.apache.flink.connector.jdbc.xa.XaFacadeImpl.wrapException(XaFacadeImpl.java:353)
> at
> org.apache.flink.connector.jdbc.xa.XaFacadeImpl.access$800(XaFacadeImpl.java:66)
> at
> org.apache.flink.connector.jdbc.xa.XaFacadeImpl$Command.lambda$fromRunnable$0(XaFacadeImpl.java:288)
> at
> org.apache.flink.connector.jdbc.xa.XaFacadeImpl$Command.lambda$fromRunnable$4(XaFacadeImpl.java:327)
> at
> org.apache.flink.connector.jdbc.xa.XaFacadeImpl.execute(XaFacadeImpl.java:267)
> at
> org.apache.flink.connector.jdbc.xa.XaFacadeImpl.start(XaFacadeImpl.java:160)
> at
> org.apache.flink.connector.jdbc.xa.JdbcXaSinkFunction.beginTx(JdbcXaSinkFunction.java:302)
> at
> org.apache.flink.connector.jdbc.xa.JdbcXaSinkFunction.snapshotState(JdbcXaSinkFunction.java:241)
> at
> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.trySnapshotFunctionState(StreamingFunctionUtils.java:118)
> at
> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.snapshotFunctionState(StreamingFunctionUtils.java:99)
> at
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.snapshotState(AbstractUdfStreamOperator.java:89)
> at
> org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:205)
> ... 23 more
> {code}
> I think the scenario is quite predictable because it is how xa transaction
> works.
> The MySQL shell example below behaves quite similar to how JdbcXaSinkFunction
> works.
> {code:java}
> xa start "1111";
> # Inserting some rows
> # end the current transaction
> xa end "1111";
> xa prepare "1111";
> # start a new transaction with the same connection while the previous one is
> PREPARED
> xa prepare "2222";
> {code}
> This also produces error 'SQL Error [1399] [XAE07]: XAER_RMFAIL: The command
> cannot be executed when global transaction is in the PREPARED state'.
>
--
This message was sent by Atlassian Jira
(v8.3.4#803005)