[ 
https://issues.apache.org/jira/browse/FLINK-22141?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17322247#comment-17322247
 ] 

Yuan Mei commented on FLINK-22141:
----------------------------------

manual test case:
https://github.com/apache/flink/pull/15627

connecting to postgres

sink parallel: 12

Issues found:
1. We should at least set max_prepared_transactions >= sink parallel * 2
(with max_prepared_transactions = 20, I was getting exceed 
max_prepared_transactions error)

2. with max_prepared_transactions = 50


{code:java}
2021-04-15 17:50:29,180 WARN  org.apache.flink.runtime.taskmanager.Task         
           [] - Sink: Unnamed (2/12)#0 (e6a72f61a8b315d1a7a7f5ff2e160f69) 
switched from RUNNING to FAILED with failure cause: 
org.apache.flink.util.FlinkRuntimeException: failed to commit 1 transactions 
out of 1
        at 
org.apache.flink.connector.jdbc.xa.XaGroupOps$GroupXaOperationResult.wrapFailure(XaGroupOps.java:66)
        at 
org.apache.flink.connector.jdbc.xa.XaGroupOps$GroupXaOperationResult.lambda$throwIfAnyFailed$0(XaGroupOps.java:88)
        at java.base/java.util.Optional.map(Optional.java:265)
        at 
org.apache.flink.connector.jdbc.xa.XaGroupOps$GroupXaOperationResult.throwIfAnyFailed(XaGroupOps.java:86)
        at 
org.apache.flink.connector.jdbc.xa.XaGroupOpsImpl.commit(XaGroupOpsImpl.java:66)
        at 
org.apache.flink.connector.jdbc.xa.JdbcXaSinkFunction.commitUpToCheckpoint(JdbcXaSinkFunction.java:313)
        at 
org.apache.flink.connector.jdbc.xa.JdbcXaSinkFunction.notifyCheckpointComplete(JdbcXaSinkFunction.java:245)
        at 
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.notifyCheckpointComplete(AbstractUdfStreamOperator.java:130)
        at 
org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.notifyCheckpointComplete(StreamOperatorWrapper.java:99)
        at 
org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.notifyCheckpointComplete(SubtaskCheckpointCoordinatorImpl.java:330)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.notifyCheckpointComplete(StreamTask.java:1137)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$notifyCheckpointCompleteAsync$10(StreamTask.java:1102)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$notifyCheckpointOperation$12(StreamTask.java:1125)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
        at 
org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90)
        at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsWhenDefaultActionUnavailable(MailboxProcessor.java:344)
        at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:330)
        at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:202)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:657)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:623)
        at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:776)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563)
        at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: org.apache.flink.util.FlinkRuntimeException: unable to commit XA 
transaction, xid: 
201:4315602f63ad3306a71b48933ff73a80010000000000000000000000:400443a7, error 
-3: resource manager error has occurred. [Not implemented: 2nd phase commit 
must be issued using an idle connection. commit 
xid=201:4315602f63ad3306a71b48933ff73a80010000000000000000000000:400443a7, 
currentXid=201:4315602f63ad3306a71b48933ff73a80010000000200000000000000:400443a7,
 state=ACTIVE, transactionState=IDLE]
        at 
org.apache.flink.connector.jdbc.xa.XaFacadeImpl.wrapException(XaFacadeImpl.java:356)
        at 
org.apache.flink.connector.jdbc.xa.XaFacadeImpl$Command.lambda$fromRunnableRecoverByWarn$1(XaFacadeImpl.java:310)
        at java.base/java.util.Optional.orElseThrow(Optional.java:408)
        at 
org.apache.flink.connector.jdbc.xa.XaFacadeImpl$Command.lambda$fromRunnableRecoverByWarn$2(XaFacadeImpl.java:308)
        at 
org.apache.flink.connector.jdbc.xa.XaFacadeImpl$Command.lambda$fromRunnable$4(XaFacadeImpl.java:327)
        at 
org.apache.flink.connector.jdbc.xa.XaFacadeImpl.execute(XaFacadeImpl.java:267)
        at 
org.apache.flink.connector.jdbc.xa.XaFacadeImpl.commit(XaFacadeImpl.java:195)
        at 
org.apache.flink.connector.jdbc.xa.XaGroupOpsImpl.commit(XaGroupOpsImpl.java:57)
        ... 18 more
{code}




> Manually test exactly-once JDBC sink
> ------------------------------------
>
>                 Key: FLINK-22141
>                 URL: https://issues.apache.org/jira/browse/FLINK-22141
>             Project: Flink
>          Issue Type: Test
>          Components: Connectors / JDBC
>            Reporter: Roman Khachatryan
>            Assignee: Yuan Mei
>            Priority: Blocker
>              Labels: pull-request-available, release-testing
>             Fix For: 1.13.0
>
>
> In FLINK-15578, an API and its implementation were added to JDBC connector to 
> support exactly-once semantics for sinks. The implementation uses JDBC XA 
> transactions.
> The scope of this task is to make sure:
>  # The feature is well-documented
>  # The API is reasonably easy to use
>  # The implementation works as expected
>  ## normal case: database is updated on checkpointing
>  ## failure and recovery case: no duplicates inserted, no records skipped
>  ## several DBs: postgressql, mssql, oracle (mysql has a known issue: 
> FLINK-21743)
>  ## concurrent checkpoints > 1, DoP > 1
>  # Logging is meaningful



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to