[
https://issues.apache.org/jira/browse/FLINK-30131?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17637165#comment-17637165
]
Weijie Guo commented on FLINK-30131:
------------------------------------
[~landlord] Thanks for reporting this.
At present, I have not investigated this issue in depth, but only made further
understanding:
1. It can be seen that the thread is stuck in the
`requestMemorySegmentBlocking` when the data shuffle is written. Theoretically,
this will not be stuck all the time, and will be reused with downstream
consumption. So, Is your job hanging forever?
2. What environment did you run this job, local or YARN/K8S, which helps to
reproduce the problem. And can you also provide your flink-conf.yaml?
3. If you need to increase the number of network buffers, you only need to
increase the total TM memory and network memory, adjusting
`taskmanager.network.memory.buffers-per-channel` and
`taskmanager.network.memory.floating-buffers-per-gate` will not help solve this
problem, but will also make the buffer request more competitive.
> flink iterate will suspend when record is a bit large
> -----------------------------------------------------
>
> Key: FLINK-30131
> URL: https://issues.apache.org/jira/browse/FLINK-30131
> Project: Flink
> Issue Type: Bug
> Components: API / DataStream
> Affects Versions: 1.15.2
> Reporter: Lu
> Priority: Major
> Attachments: image-2022-11-22-14-59-08-272.png
>
>
>
> {code:java}
> //代码占位符
> Configuration configuration = new Configuration();
> configuration.setInteger(RestOptions.PORT, 8082);
> configuration.setInteger(NETWORK_MAX_BUFFERS_PER_CHANNEL, 10000000);
> configuration.set(TaskManagerOptions.NETWORK_MEMORY_MAX,
> MemorySize.parse("4g"));
> configuration.setInteger("taskmanager.network.memory.buffers-per-channel",
> 10000000);
> configuration.setInteger("taskmanager.network.memory.floating-buffers-per-gate",
> 10000000);
> final StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment(configuration);
> env.setParallelism(1);
> List<Integer> list = new ArrayList<>(10);
> for (int i = 1; i < 10000; i++) {
> list.add(i);
> }
> DataStreamSource<Integer> integerDataStreamSource = env.fromCollection(list);
> DataStream<byte[]> map = integerDataStreamSource.map(i -> new
> byte[10000000]).setParallelism(1).name("map to byte[]").shuffle();
> IterativeStream<byte[]> iterate = map.iterate();
> DataStream<byte[]> map1 = iterate.process(new ProcessFunction<byte[],
> byte[]>() {
> @Override
> public void processElement(byte[] value, ProcessFunction<byte[],
> byte[]>.Context ctx, Collector<byte[]> out) throws Exception {
> out.collect(value);
> }
> }).name("multi collect");
> DataStream<byte[]> filter = map1.filter(i -> true
> ).setParallelism(1).name("feedback");
> iterate.closeWith(filter);
> map1.map(bytes -> bytes.length).name("map to length").print();
> env.execute(); {code}
> my code is above.
>
> when i use iterate with big record , the iterate will suspend at a random
> place. when i saw the stack, it has a suspicious thread
> !image-2022-11-22-14-59-08-272.png|width=751,height=328!
> it seems like a network related problem. so i increse the network buffer
> memory and num. but it only delay the suspend point, it will still suspend
> after iterate a little more times than before.
> i want to know if this is a bug or i have some error in my code or
> configuration.
> looking forward to your reply. thanks in advance.
>
>
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)