Wei Hao created FLINK-21743:
-------------------------------
Summary: JdbcXaSinkFunction throws XAER_RMFAIL when calling
snapshotState and beginTx
Key: FLINK-21743
URL: https://issues.apache.org/jira/browse/FLINK-21743
Project: Flink
Issue Type: Test
Components: Connectors / JDBC
Affects Versions: 1.13.0
Environment: org.apache.flink:flink-streaming-java_2.11:1.12.1
org.apache.flink:flink-connector-jdbc_2.11:1.13-SNAPSHOT
Reporter: Wei Hao
{code:java}
public void snapshotState(FunctionSnapshotContext context) throws Exception {
LOG.debug("snapshot state, checkpointId={}", context.getCheckpointId());
this.rollbackPreparedFromCheckpoint(context.getCheckpointId());
this.prepareCurrentTx(context.getCheckpointId());
this.beginTx(context.getCheckpointId() + 1L);
this.stateHandler.store(JdbcXaSinkFunctionState.of(this.preparedXids,
this.hangingXids));
}
{code}
When checkpointing starts, it calls snapshotState(), which ends and prepares
the current transaction. The issue I found is with beginTx(), where a new Xid
is generated and xaFacade will run command like 'xa start new_xid', which will
throw the exception as shown below and causes checkpointing failure.
{code:java}
Caused by: org.apache.flink.connector.jdbc.xa.XaFacade$TransientXaException:
com.mysql.cj.jdbc.MysqlXAException: XAER_RMFAIL: The command cannot be executed
when global transaction is in the PREPARED stateCaused by:
org.apache.flink.connector.jdbc.xa.XaFacade$TransientXaException:
com.mysql.cj.jdbc.MysqlXAException: XAER_RMFAIL: The command cannot be executed
when global transaction is in the PREPARED state at
org.apache.flink.connector.jdbc.xa.XaFacadeImpl.wrapException(XaFacadeImpl.java:353)
at
org.apache.flink.connector.jdbc.xa.XaFacadeImpl.access$800(XaFacadeImpl.java:66)
at
org.apache.flink.connector.jdbc.xa.XaFacadeImpl$Command.lambda$fromRunnable$0(XaFacadeImpl.java:288)
at
org.apache.flink.connector.jdbc.xa.XaFacadeImpl$Command.lambda$fromRunnable$4(XaFacadeImpl.java:327)
at
org.apache.flink.connector.jdbc.xa.XaFacadeImpl.execute(XaFacadeImpl.java:267)
at org.apache.flink.connector.jdbc.xa.XaFacadeImpl.start(XaFacadeImpl.java:160)
at
org.apache.flink.connector.jdbc.xa.JdbcXaSinkFunction.beginTx(JdbcXaSinkFunction.java:302)
at
org.apache.flink.connector.jdbc.xa.JdbcXaSinkFunction.snapshotState(JdbcXaSinkFunction.java:241)
at
org.apache.flink.streaming.util.functions.StreamingFunctionUtils.trySnapshotFunctionState(StreamingFunctionUtils.java:118)
at
org.apache.flink.streaming.util.functions.StreamingFunctionUtils.snapshotFunctionState(StreamingFunctionUtils.java:99)
at
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.snapshotState(AbstractUdfStreamOperator.java:89)
at
org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:205)
... 23 more
{code}
I think the scenario is quite predictable because it is how xa transaction
works.
The MySQL shell example below behaves quite similar to how JdbcXaSinkFunction
works.
{code:java}
xa start "1111";
# Inserting some rows
# end the current transaction
xa end "1111";
xa prepare "1111";
# start a new transaction with the same connection while the previous one is
PREPARED
xa prepare "2222";
{code}
This also produces error 'SQL Error [1399] [XAE07]: XAER_RMFAIL: The command
cannot be executed when global transaction is in the PREPARED state'.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)