[ 
https://issues.apache.org/jira/browse/FLINK-33486?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17837160#comment-17837160
 ] 

Yufan Sheng commented on FLINK-33486:
-------------------------------------

The retry logic is internally provided by the Pulsar client. Should we 
implement this feature in Flink side? I don't thinks so. But [~dchristle] 
sounds reasonable to me, that is add a retry logic in `at-least-once` mode and 
let it crash in `exactly-once` mode.

> Pulsar Client Send Timeout Terminates TaskManager
> -------------------------------------------------
>
>                 Key: FLINK-33486
>                 URL: https://issues.apache.org/jira/browse/FLINK-33486
>             Project: Flink
>          Issue Type: Bug
>          Components: Connectors / Pulsar
>    Affects Versions: 1.17.1
>            Reporter: Jason Kania
>            Priority: Major
>
> Currently, when the Pulsar Producer encounters a timeout when attempting to 
> send data, it generates an unhandled TimeoutException. This is not a 
> reasonable way to handle the timeout. The situation should be handled in a 
> graceful way either through additional parameters that put control of the 
> action under the discretion of the user or through some callback mechanism 
> that the user can work with to write code. Unfortunately, fight now, this 
> causes a termination of the task manager which then leads to other issues.
> Increasing the timeout period to avoid the issue is not really an option to 
> ensure proper handling in the event that the situation does occur.
> The exception is as follows:
> org.apache.flink.util.FlinkRuntimeException: Failed to send data to Pulsar: 
> persistent://public/default/myproducer-partition-0
>         at 
> org.apache.flink.connector.pulsar.sink.writer.PulsarWriter.throwSendingException(PulsarWriter.java:182)
>  ~[flink-connector-pulsar-4.0.0-1.17.jar:4.0.0-1.17]
>         at 
> org.apache.flink.connector.pulsar.sink.writer.PulsarWriter.lambda$write$0(PulsarWriter.java:172)
>  ~[flink-connector-pulsar-4.0.0-1.17.jar:4.0.0-1.17]
>         at 
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
>  ~[flink-dist-1.17.1.jar:1.17.1]
>         at 
> org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90) 
> ~[flink-dist-1.17.1.jar:1.17.1]
>         at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMail(MailboxProcessor.java:398)
>  ~[flink-dist-1.17.1.jar:1.17.1]
>         at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsWhenDefaultActionUnavailable(MailboxProcessor.java:367)
>  ~[flink-dist-1.17.1.jar:1.17.1]
>         at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:352)
>  ~[flink-dist-1.17.1.jar:1.17.1]
>         at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:229)
>  ~[flink-dist-1.17.1.jar:1.17.1]
>         at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:839)
>  ~[flink-dist-1.17.1.jar:1.17.1]
>         at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:788)
>  ~[flink-dist-1.17.1.jar:1.17.1]
>         at 
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:952)
>  ~[flink-dist-1.17.1.jar:1.17.1]
>         at 
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:931) 
> [flink-dist-1.17.1.jar:1.17.1]
>         at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:745) 
> [flink-dist-1.17.1.jar:1.17.1]
>         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562) 
> [flink-dist-1.17.1.jar:1.17.1]
>         at java.lang.Thread.run(Thread.java:829) [?:?]
> Caused by: 
> org.apache.pulsar.client.api.PulsarClientException$TimeoutException: The 
> producer myproducer- f4b1580b-1ea8-4c21-9d0b-da4d12ca6f93 can not send 
> message to the topic persistent://public/default/myproducer-partition-0 
> within given timeout
>         at 
> org.apache.pulsar.client.impl.ProducerImpl.run(ProducerImpl.java:1993) 
> ~[pulsar-client-all-2.11.2.jar:2.11.2]
>         at 
> org.apache.pulsar.shade.io.netty.util.HashedWheelTimer$HashedWheelTimeout.run(HashedWheelTimer.java:715)
>  ~[pulsar-client-all-2.11.2.jar:2.11.2]
>         at 
> org.apache.pulsar.shade.io.netty.util.concurrent.ImmediateExecutor.execute(ImmediateExecutor.java:34)
>  ~[pulsar-client-all-2.11.2.jar:2.11.2]
>         at 
> org.apache.pulsar.shade.io.netty.util.HashedWheelTimer$HashedWheelTimeout.expire(HashedWheelTimer.java:703)
>  ~[pulsar-client-all-2.11.2.jar:2.11.2]
>         at 
> org.apache.pulsar.shade.io.netty.util.HashedWheelTimer$HashedWheelBucket.expireTimeouts(HashedWheelTimer.java:790)
>  ~[pulsar-client-all-2.11.2.jar:2.11.2]
>         at 
> org.apache.pulsar.shade.io.netty.util.HashedWheelTimer$Worker.run(HashedWheelTimer.java:503)
>  ~[pulsar-client-all-2.11.2.jar:2.11.2]
>         at 
> org.apache.pulsar.shade.io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
>  ~[pulsar-client-all-2.11.2.jar:2.11.2]
>         ... 1 more
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to