[
https://issues.apache.org/jira/browse/FLINK-17559?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Luis updated FLINK-17559:
-------------------------
Description:
Back pressure for Flink seems broken. Someone please correct me, from what I
understand it only works between network transfers. If I have a source with no
thread sleep then there is no back pressure some operation will accumulate data
and crash. I even tried removing chaining with
env.disableOperatorChaining()
and it works with parallelism set to 1, but with 3 or 4 crashes. See below.
>From this I can conclude if I have any map function that produces more output
>that is coming in it will eventually crash if there is no network dividing
>them to allow for backpressure. Is this correct?
{code:java}
java.lang.OutOfMemoryError: Java heap space
2020-05-07 18:27:37,942 ERROR
org.apache.flink.runtime.util.FatalExitExceptionHandler - FATAL: Thread
'flink-scheduler-1' produced an uncaught exception. Stopping the process...
java.lang.OutOfMemoryError: Java heap space
at akka.dispatch.AbstractNodeQueue.<init>(AbstractNodeQueue.java:32)
at
akka.actor.LightArrayRevolverScheduler$TaskQueue.<init>(LightArrayRevolverScheduler.scala:305)
at
akka.actor.LightArrayRevolverScheduler$$anon$4.nextTick(LightArrayRevolverScheduler.scala:270)
at
akka.actor.LightArrayRevolverScheduler$$anon$4.run(LightArrayRevolverScheduler.scala:236)
at java.lang.Thread.run(Thread.java:748)
2020-05-07 18:27:35,725 ERROR
org.apache.flink.runtime.util.FatalExitExceptionHandler - FATAL: Thread
'flink-metrics-8' produced an uncaught exception. Stopping the process...
java.lang.OutOfMemoryError: Java heap space
2020-05-07 18:27:35,725 ERROR
com.rabbitmq.client.impl.ForgivingExceptionHandler - An unexpected
connection driver error occured
java.lang.OutOfMemoryError: Java heap space
at com.rabbitmq.client.impl.Frame.readFrom(Frame.java:120)
at
com.rabbitmq.client.impl.SocketFrameHandler.readFrame(SocketFrameHandler.java:164)
at
com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:580)
at java.lang.Thread.run(Thread.java:748)
{code}
[https://stackoverflow.com/questions/61465789/how-do-i-create-a-flink-richparallelsourcefunction-with-backpressure]
It seems that I am suppose guess how much my sink can handle and throttle to
that amount in my source generator. But that always puts my system of a risk of
crashing.
was:
Back pressure for Flink seems broken. Someone please correct me, from what I
understand it only works between network transfers. If I have a source with no
thread sleep then there is no back pressure some operation will accumulate data
and crash. I even tried removing chaining with
env.disableOperatorChaining()
and it works with parallelism set to 1, but with 3 or 4 crashes. See below.
>From this I can conclude if I have any map function that produces more output
>that is coming in it will eventually crash if there is no network dividing
>them to allow for backpressure. Is this correct?
{code:java}
java.lang.OutOfMemoryError: Java heap space
2020-05-07 18:27:37,942 ERROR
org.apache.flink.runtime.util.FatalExitExceptionHandler - FATAL: Thread
'flink-scheduler-1' produced an uncaught exception. Stopping the process...
java.lang.OutOfMemoryError: Java heap space
at akka.dispatch.AbstractNodeQueue.<init>(AbstractNodeQueue.java:32)
at
akka.actor.LightArrayRevolverScheduler$TaskQueue.<init>(LightArrayRevolverScheduler.scala:305)
at
akka.actor.LightArrayRevolverScheduler$$anon$4.nextTick(LightArrayRevolverScheduler.scala:270)
at
akka.actor.LightArrayRevolverScheduler$$anon$4.run(LightArrayRevolverScheduler.scala:236)
at java.lang.Thread.run(Thread.java:748)
2020-05-07 18:27:35,725 ERROR
org.apache.flink.runtime.util.FatalExitExceptionHandler - FATAL: Thread
'flink-metrics-8' produced an uncaught exception. Stopping the process...
java.lang.OutOfMemoryError: Java heap space
2020-05-07 18:27:35,725 ERROR
com.rabbitmq.client.impl.ForgivingExceptionHandler - An unexpected
connection driver error occured
java.lang.OutOfMemoryError: Java heap space
at com.rabbitmq.client.impl.Frame.readFrom(Frame.java:120)
at
com.rabbitmq.client.impl.SocketFrameHandler.readFrame(SocketFrameHandler.java:164)
at
com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:580)
at java.lang.Thread.run(Thread.java:748)
{code}
[https://stackoverflow.com/questions/61465789/how-do-i-create-a-flink-richparallelsourcefunction-with-backpressure]
> Backpressure seems to be broken when not going through network
> --------------------------------------------------------------
>
> Key: FLINK-17559
> URL: https://issues.apache.org/jira/browse/FLINK-17559
> Project: Flink
> Issue Type: Bug
> Components: API / Core, Connectors/ RabbitMQ
> Affects Versions: 1.8.2
> Reporter: Luis
> Priority: Major
> Attachments: Screenshot from 2020-05-07 13-31-23.png
>
>
> Back pressure for Flink seems broken. Someone please correct me, from what I
> understand it only works between network transfers. If I have a source with
> no thread sleep then there is no back pressure some operation will accumulate
> data and crash. I even tried removing chaining with
> env.disableOperatorChaining()
> and it works with parallelism set to 1, but with 3 or 4 crashes. See below.
>
> From this I can conclude if I have any map function that produces more output
> that is coming in it will eventually crash if there is no network dividing
> them to allow for backpressure. Is this correct?
>
>
> {code:java}
> java.lang.OutOfMemoryError: Java heap space
> 2020-05-07 18:27:37,942 ERROR
> org.apache.flink.runtime.util.FatalExitExceptionHandler - FATAL: Thread
> 'flink-scheduler-1' produced an uncaught exception. Stopping the process...
> java.lang.OutOfMemoryError: Java heap space
> at akka.dispatch.AbstractNodeQueue.<init>(AbstractNodeQueue.java:32)
> at
> akka.actor.LightArrayRevolverScheduler$TaskQueue.<init>(LightArrayRevolverScheduler.scala:305)
> at
> akka.actor.LightArrayRevolverScheduler$$anon$4.nextTick(LightArrayRevolverScheduler.scala:270)
> at
> akka.actor.LightArrayRevolverScheduler$$anon$4.run(LightArrayRevolverScheduler.scala:236)
> at java.lang.Thread.run(Thread.java:748)
> 2020-05-07 18:27:35,725 ERROR
> org.apache.flink.runtime.util.FatalExitExceptionHandler - FATAL: Thread
> 'flink-metrics-8' produced an uncaught exception. Stopping the process...
> java.lang.OutOfMemoryError: Java heap space
> 2020-05-07 18:27:35,725 ERROR
> com.rabbitmq.client.impl.ForgivingExceptionHandler - An unexpected
> connection driver error occured
> java.lang.OutOfMemoryError: Java heap space
> at com.rabbitmq.client.impl.Frame.readFrom(Frame.java:120)
> at
> com.rabbitmq.client.impl.SocketFrameHandler.readFrame(SocketFrameHandler.java:164)
> at
> com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:580)
> at java.lang.Thread.run(Thread.java:748)
> {code}
>
> [https://stackoverflow.com/questions/61465789/how-do-i-create-a-flink-richparallelsourcefunction-with-backpressure]
>
>
> It seems that I am suppose guess how much my sink can handle and throttle to
> that amount in my source generator. But that always puts my system of a risk
> of crashing.
>
>
>
--
This message was sent by Atlassian Jira
(v8.3.4#803005)