[
https://issues.apache.org/jira/browse/FLINK-33486?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17837160#comment-17837160
]
Yufan Sheng commented on FLINK-33486:
-------------------------------------
The retry logic is internally provided by the Pulsar client. Should we
implement this feature in Flink side? I don't thinks so. But [~dchristle]
sounds reasonable to me, that is add a retry logic in `at-least-once` mode and
let it crash in `exactly-once` mode.
> Pulsar Client Send Timeout Terminates TaskManager
> -------------------------------------------------
>
> Key: FLINK-33486
> URL: https://issues.apache.org/jira/browse/FLINK-33486
> Project: Flink
> Issue Type: Bug
> Components: Connectors / Pulsar
> Affects Versions: 1.17.1
> Reporter: Jason Kania
> Priority: Major
>
> Currently, when the Pulsar Producer encounters a timeout when attempting to
> send data, it generates an unhandled TimeoutException. This is not a
> reasonable way to handle the timeout. The situation should be handled in a
> graceful way either through additional parameters that put control of the
> action under the discretion of the user or through some callback mechanism
> that the user can work with to write code. Unfortunately, fight now, this
> causes a termination of the task manager which then leads to other issues.
> Increasing the timeout period to avoid the issue is not really an option to
> ensure proper handling in the event that the situation does occur.
> The exception is as follows:
> org.apache.flink.util.FlinkRuntimeException: Failed to send data to Pulsar:
> persistent://public/default/myproducer-partition-0
> at
> org.apache.flink.connector.pulsar.sink.writer.PulsarWriter.throwSendingException(PulsarWriter.java:182)
> ~[flink-connector-pulsar-4.0.0-1.17.jar:4.0.0-1.17]
> at
> org.apache.flink.connector.pulsar.sink.writer.PulsarWriter.lambda$write$0(PulsarWriter.java:172)
> ~[flink-connector-pulsar-4.0.0-1.17.jar:4.0.0-1.17]
> at
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
> ~[flink-dist-1.17.1.jar:1.17.1]
> at
> org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90)
> ~[flink-dist-1.17.1.jar:1.17.1]
> at
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMail(MailboxProcessor.java:398)
> ~[flink-dist-1.17.1.jar:1.17.1]
> at
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsWhenDefaultActionUnavailable(MailboxProcessor.java:367)
> ~[flink-dist-1.17.1.jar:1.17.1]
> at
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:352)
> ~[flink-dist-1.17.1.jar:1.17.1]
> at
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:229)
> ~[flink-dist-1.17.1.jar:1.17.1]
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:839)
> ~[flink-dist-1.17.1.jar:1.17.1]
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:788)
> ~[flink-dist-1.17.1.jar:1.17.1]
> at
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:952)
> ~[flink-dist-1.17.1.jar:1.17.1]
> at
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:931)
> [flink-dist-1.17.1.jar:1.17.1]
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:745)
> [flink-dist-1.17.1.jar:1.17.1]
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562)
> [flink-dist-1.17.1.jar:1.17.1]
> at java.lang.Thread.run(Thread.java:829) [?:?]
> Caused by:
> org.apache.pulsar.client.api.PulsarClientException$TimeoutException: The
> producer myproducer- f4b1580b-1ea8-4c21-9d0b-da4d12ca6f93 can not send
> message to the topic persistent://public/default/myproducer-partition-0
> within given timeout
> at
> org.apache.pulsar.client.impl.ProducerImpl.run(ProducerImpl.java:1993)
> ~[pulsar-client-all-2.11.2.jar:2.11.2]
> at
> org.apache.pulsar.shade.io.netty.util.HashedWheelTimer$HashedWheelTimeout.run(HashedWheelTimer.java:715)
> ~[pulsar-client-all-2.11.2.jar:2.11.2]
> at
> org.apache.pulsar.shade.io.netty.util.concurrent.ImmediateExecutor.execute(ImmediateExecutor.java:34)
> ~[pulsar-client-all-2.11.2.jar:2.11.2]
> at
> org.apache.pulsar.shade.io.netty.util.HashedWheelTimer$HashedWheelTimeout.expire(HashedWheelTimer.java:703)
> ~[pulsar-client-all-2.11.2.jar:2.11.2]
> at
> org.apache.pulsar.shade.io.netty.util.HashedWheelTimer$HashedWheelBucket.expireTimeouts(HashedWheelTimer.java:790)
> ~[pulsar-client-all-2.11.2.jar:2.11.2]
> at
> org.apache.pulsar.shade.io.netty.util.HashedWheelTimer$Worker.run(HashedWheelTimer.java:503)
> ~[pulsar-client-all-2.11.2.jar:2.11.2]
> at
> org.apache.pulsar.shade.io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
> ~[pulsar-client-all-2.11.2.jar:2.11.2]
> ... 1 more
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)