[ 
https://issues.apache.org/jira/browse/FLINK-33486?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17798684#comment-17798684
 ] 

David Christle commented on FLINK-33486:
----------------------------------------

We also observe this issue, typically at higher QPS. When it triggers, the task 
will restart, but future checkpoints fail if the tolerable checkpoint failure 
count is greater than zero. Setting the tolerable checkpoint failure count to 
zero triggers a more complete restart, which fixes the issue, but it means 
publishing is paused for a few minutes, which is not ideal.

At least for the best effort/at-least-once delivery modes, is there some way to 
implement a retry when send timeout triggers? This way, we'd potentially 
publish a single message/batch twice, rather than triggering a full failure + 
republishing all messages since the last checkpoint.

> Pulsar Client Send Timeout Terminates TaskManager
> -------------------------------------------------
>
>                 Key: FLINK-33486
>                 URL: https://issues.apache.org/jira/browse/FLINK-33486
>             Project: Flink
>          Issue Type: Bug
>          Components: Connectors / Pulsar
>    Affects Versions: 1.17.1
>            Reporter: Jason Kania
>            Priority: Major
>
> Currently, when the Pulsar Producer encounters a timeout when attempting to 
> send data, it generates an unhandled TimeoutException. This is not a 
> reasonable way to handle the timeout. The situation should be handled in a 
> graceful way either through additional parameters that put control of the 
> action under the discretion of the user or through some callback mechanism 
> that the user can work with to write code. Unfortunately, fight now, this 
> causes a termination of the task manager which then leads to other issues.
> Increasing the timeout period to avoid the issue is not really an option to 
> ensure proper handling in the event that the situation does occur.
> The exception is as follows:
> org.apache.flink.util.FlinkRuntimeException: Failed to send data to Pulsar: 
> persistent://public/default/myproducer-partition-0
>         at 
> org.apache.flink.connector.pulsar.sink.writer.PulsarWriter.throwSendingException(PulsarWriter.java:182)
>  ~[flink-connector-pulsar-4.0.0-1.17.jar:4.0.0-1.17]
>         at 
> org.apache.flink.connector.pulsar.sink.writer.PulsarWriter.lambda$write$0(PulsarWriter.java:172)
>  ~[flink-connector-pulsar-4.0.0-1.17.jar:4.0.0-1.17]
>         at 
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
>  ~[flink-dist-1.17.1.jar:1.17.1]
>         at 
> org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90) 
> ~[flink-dist-1.17.1.jar:1.17.1]
>         at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMail(MailboxProcessor.java:398)
>  ~[flink-dist-1.17.1.jar:1.17.1]
>         at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsWhenDefaultActionUnavailable(MailboxProcessor.java:367)
>  ~[flink-dist-1.17.1.jar:1.17.1]
>         at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:352)
>  ~[flink-dist-1.17.1.jar:1.17.1]
>         at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:229)
>  ~[flink-dist-1.17.1.jar:1.17.1]
>         at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:839)
>  ~[flink-dist-1.17.1.jar:1.17.1]
>         at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:788)
>  ~[flink-dist-1.17.1.jar:1.17.1]
>         at 
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:952)
>  ~[flink-dist-1.17.1.jar:1.17.1]
>         at 
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:931) 
> [flink-dist-1.17.1.jar:1.17.1]
>         at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:745) 
> [flink-dist-1.17.1.jar:1.17.1]
>         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562) 
> [flink-dist-1.17.1.jar:1.17.1]
>         at java.lang.Thread.run(Thread.java:829) [?:?]
> Caused by: 
> org.apache.pulsar.client.api.PulsarClientException$TimeoutException: The 
> producer myproducer- f4b1580b-1ea8-4c21-9d0b-da4d12ca6f93 can not send 
> message to the topic persistent://public/default/myproducer-partition-0 
> within given timeout
>         at 
> org.apache.pulsar.client.impl.ProducerImpl.run(ProducerImpl.java:1993) 
> ~[pulsar-client-all-2.11.2.jar:2.11.2]
>         at 
> org.apache.pulsar.shade.io.netty.util.HashedWheelTimer$HashedWheelTimeout.run(HashedWheelTimer.java:715)
>  ~[pulsar-client-all-2.11.2.jar:2.11.2]
>         at 
> org.apache.pulsar.shade.io.netty.util.concurrent.ImmediateExecutor.execute(ImmediateExecutor.java:34)
>  ~[pulsar-client-all-2.11.2.jar:2.11.2]
>         at 
> org.apache.pulsar.shade.io.netty.util.HashedWheelTimer$HashedWheelTimeout.expire(HashedWheelTimer.java:703)
>  ~[pulsar-client-all-2.11.2.jar:2.11.2]
>         at 
> org.apache.pulsar.shade.io.netty.util.HashedWheelTimer$HashedWheelBucket.expireTimeouts(HashedWheelTimer.java:790)
>  ~[pulsar-client-all-2.11.2.jar:2.11.2]
>         at 
> org.apache.pulsar.shade.io.netty.util.HashedWheelTimer$Worker.run(HashedWheelTimer.java:503)
>  ~[pulsar-client-all-2.11.2.jar:2.11.2]
>         at 
> org.apache.pulsar.shade.io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
>  ~[pulsar-client-all-2.11.2.jar:2.11.2]
>         ... 1 more
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to