[
https://issues.apache.org/jira/browse/FLINK-22881?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Xintong Song reopened FLINK-22881:
----------------------------------
Reopening the ticket.
Reverted in 8758b21b44d84412113df03255f2bad9d2fb6372
The commit has broken the master branch. HiveTableSinkITCase constantly fails
after it is merged.
> Tasks are blocked while broadcasting stream status
> --------------------------------------------------
>
> Key: FLINK-22881
> URL: https://issues.apache.org/jira/browse/FLINK-22881
> Project: Flink
> Issue Type: Bug
> Components: Runtime / Network, Runtime / Task
> Affects Versions: 1.14.0
> Reporter: Piotr Nowojski
> Assignee: Dawid Wysakowicz
> Priority: Critical
> Labels: pull-request-available
> Fix For: 1.14.0
>
>
> On a cluster I observed symptoms of tasks being blocked for long time,
> causing long delays with unaligned checkpointing. 99% of those cases were
> caused by `broadcastEmit` of the stream status
> {noformat}
> 2021-06-04 14:41:44,049 ERROR
> org.apache.flink.runtime.io.network.buffer.LocalBufferPool [] - Blocking
> wait [11059 ms] for an available buffer.
> java.lang.Exception: Stracktracegenerator
> at
> org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestMemorySegmentBlocking(LocalBufferPool.java:323)
> ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
> at
> org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBuilderBlocking(LocalBufferPool.java:290)
> ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
> at
> org.apache.flink.runtime.io.network.partition.BufferWritingResultPartition.requestNewBufferBuilderFromPool(BufferWritingResultPartition.java:338)
> ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
> at
> org.apache.flink.runtime.io.network.partition.BufferWritingResultPartition.requestNewUnicastBufferBuilder(BufferWritingResultPartition.java:314)
> ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
> at
> org.apache.flink.runtime.io.network.partition.BufferWritingResultPartition.appendUnicastDataForNewRecord(BufferWritingResultPartition.java:246)
> ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
> at
> org.apache.flink.runtime.io.network.partition.BufferWritingResultPartition.emitRecord(BufferWritingResultPartition.java:142)
> ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
> at
> org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:104)
> ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
> at
> org.apache.flink.runtime.io.network.api.writer.ChannelSelectorRecordWriter.broadcastEmit(ChannelSelectorRecordWriter.java:67)
> ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
> at
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.writeStreamStatus(RecordWriterOutput.java:136)
> ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
> at
> org.apache.flink.streaming.runtime.streamstatus.AnnouncedStatus.ensureActive(AnnouncedStatus.java:65)
> ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
> at
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:103)
> ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
> at
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:90)
> ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
> at
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:44)
> ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
> at
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:56)
> ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
> at
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:29)
> ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
> at
> org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:38)
> ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
> at
> org.apache.flink.streaming.runtime.tasks.ChainingOutput.pushToOperator(ChainingOutput.java:101)
> ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
> at
> org.apache.flink.streaming.runtime.tasks.ChainingOutput.collect(ChainingOutput.java:82)
> ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
> at
> org.apache.flink.streaming.runtime.tasks.ChainingOutput.collect(ChainingOutput.java:39)
> ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
> at
> org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask$AsyncDataOutputToOutput.emitRecord(SourceOperatorStreamTask.java:182)
> ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
> at
> org.apache.flink.streaming.api.operators.source.SourceOutputWithWatermarks.collect(SourceOutputWithWatermarks.java:110)
> ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
> at
> org.apache.flink.streaming.api.operators.source.SourceOutputWithWatermarks.collect(SourceOutputWithWatermarks.java:101)
> ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
> at
> org.apache.flink.api.connector.source.lib.util.IteratorSourceReader.pollNext(IteratorSourceReader.java:98)
> ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
> at
> org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:294)
> ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
> at
> org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:69)
> ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
> at
> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:66)
> ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:422)
> ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
> at
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:204)
> ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:680)
> ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.executeInvoke(StreamTask.java:635)
> ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:646)
> [flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:619)
> [flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:779)
> [flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566)
> [flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
> at java.lang.Thread.run(Thread.java:748) [?:1.8.0_282]
> {noformat}
> *I have seen this happening both in source and network tasks.* ~80% cases
> were in the source tasks
> {{broadcastEmit}} can easily bypass our non blocking checks. There are two
> questions:
> # why is the stream idling so much? It’s like almost every millisecond it’s
> broadcasting status
> # should we optimise this? Broadcasting CBs and other events is not an issue,
> as those are events that do not request/require buffers
--
This message was sent by Atlassian Jira
(v8.3.4#803005)