[
https://issues.apache.org/jira/browse/FLINK-20618?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17250255#comment-17250255
]
Piotr Nowojski commented on FLINK-20618:
----------------------------------------
I do not see anything immediately suspicious here. It's expected behaviour for
the any tasks to eventually be blocked in case of backpressure. As long as the
tasks unblock themselves once the backpressure is gone, and the job can make a
progress, there is no problem.
As [~roman_khachatryan] wrote, a backpressured task will be blocked waiting for
the backpressure to go away to make some progress. Waiting can be:
# non blocking (waiting on mailbox, task thread is idling)
# blocking (task is waiting on requesting a buffer, task thread is blocked)
The second path is "legacy" one, which we are trying to avoid as best as we
can. It's not always possible. For example, for old/legacy source tasks ([new
source
interface|https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface]
doesn't have this problem), the second path is always used.
Regarding the sequence numbers. Sent sequence number is from the upstream task
({{CreditBasedSequenceNumberingViewReader}}), expected sequence number is from
the downstream task ({{RemoteInputChannel}}). It looks like in your screenshots
one buffer is somewhere in transfer. I wouldn't worry about it, as
[~roman_khachatryan] wrote, if there is a problem in this area, it would fail
with {{BufferReorderingException}}.
> Some of the source operator subtasks will stuck when flink job in critical
> backpressure
> ---------------------------------------------------------------------------------------
>
> Key: FLINK-20618
> URL: https://issues.apache.org/jira/browse/FLINK-20618
> Project: Flink
> Issue Type: Bug
> Components: Runtime / Network
> Affects Versions: 1.11.0, 1.10.2
> Reporter: zlzhang0122
> Priority: Critical
> Attachments: 2020-12-16 11-47-37 的屏幕截图.png, 2020-12-16 11-48-30
> 的屏幕截图.png, 2020-12-16 11-49-01 的屏幕截图.png, 2020-12-16 11-53-42 的屏幕截图.png
>
>
> In some critical backpressure situation, some of the subtasks of source will
> blocked to request buffer because of the LocalBufferPool is full,so the whole
> task will be stuck and the other task run well.
> Bellow is the jstack trace:
>
> Legacy Source Thread - Source: TalosTableSource(hash, timestamp, step,
> isCustomize, hostname, endpoint, metric, dsType, orgId, idc, labels, pdl)
> -> SourceConversion(table=[default_catalog.default_database.transfer_c5,
> source: [TalosTableSource(hash, timestamp, step, isCustomize, hostname,
> endpoint, metric, dsType, orgId, idc, labels, pdl)]], fields=[hash,
> timestamp, step, isCustomize, hostname, endpoint, metric, dsType, orgId, idc,
> labels, pdl]) -> Calc(select=[hash, timestamp, step, isCustomize,
> hostname, endpoint, metric, dsType, orgId, idc, labels, pdl, () AS $12,
> (timestamp - ((timestamp + 18000) MOD 86400)) AS $13]) (62/128) #113 prio=5
> os_prio=0 tid=0x00007f43d07e1800 nid=0x1b1c waiting on condition
> [0x00007f43b8488000]
> java.lang.Thread.State: WAITING (parking)
> at sun.misc.Unsafe.park(Native Method)
> - parking to wait for <0x00000000db234488> (a
> java.util.concurrent.CompletableFuture$Signaller)
> at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
> at
> java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1693)
> at java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323)
> at
> java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1729)
> at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)
> at
> org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestMemorySegmentBlocking(LocalBufferPool.java:231)
> at
> org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBuilderBlocking(LocalBufferPool.java:209)
> at
> org.apache.flink.runtime.io.network.partition.ResultPartition.getBufferBuilder(ResultPartition.java:189)
> at
> org.apache.flink.runtime.io.network.api.writer.ChannelSelectorRecordWriter.requestNewBufferBuilder(ChannelSelectorRecordWriter.java:103)
> at
> org.apache.flink.runtime.io.network.api.writer.RecordWriter.copyFromSerializerToTargetChannel(RecordWriter.java:145)
> at
> org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:116)
> at
> org.apache.flink.runtime.io.network.api.writer.ChannelSelectorRecordWriter.emit(ChannelSelectorRecordWriter.java:60)
> at
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:107)
> at
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:89)
> at
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:45)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:732)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:710)
> at StreamExecCalc$33.processElement(Unknown Source)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:641)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:616)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:596)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:732)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:710)
> at SourceConversion$4.processElement(Unknown Source)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:641)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:616)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:596)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:732)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:710)
> at
> org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:104)
> - locked <0x00000000d8d50fa8> (a java.lang.Object)
> at
> org.apache.flink.streaming.connectors.talos.internals.AbstractFetcher.emitRecord(AbstractFetcher.java:379)
> - locked <0x00000000d8d50fa8> (a java.lang.Object)
> at
> org.apache.flink.streaming.connectors.talos.internals.TalosFetcher2.runFetchLoop(TalosFetcher2.java:249)
> at
> org.apache.flink.streaming.connectors.talos.FlinkTalosConsumerBase.run(FlinkTalosConsumerBase.java:758)
> at
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
> at
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
> at
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:196)
>
>
>
> Source: TalosTableSource(hash, timestamp, step, isCustomize, hostname,
> endpoint, metric, dsType, orgId, idc, labels, pdl) ->
> SourceConversion(table=[default_catalog.default_database.transfer_c5, source:
> [TalosTableSource(hash, timestamp, step, isCustomize, hostname, endpoint,
> metric, dsType, orgId, idc, labels, pdl)]], fields=[hash, timestamp, step,
> isCustomize, hostname, endpoint, metric, dsType, orgId, idc, labels, pdl])
> -> Calc(select=[hash, timestamp, step, isCustomize, hostname, endpoint,
> metric, dsType, orgId, idc, labels, pdl, () AS $12, (timestamp - ((timestamp
> + 18000) MOD 86400)) AS $13]) (62/128) #108 prio=5 os_prio=0
> tid=0x00007f44dc178000 nid=0x1332 waiting for monitor entry
> [0x00007f443dfd8000]
> java.lang.Thread.State: BLOCKED (on object monitor)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.run(StreamTaskActionExecutor.java:86)
> - waiting to lock <0x00000000d8d50fa8> (a java.lang.Object)
> at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:78)
> at
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:261)
> at
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:186)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:487)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:470)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532)
> at java.lang.Thread.run(Thread.java:748)
--
This message was sent by Atlassian Jira
(v8.3.4#803005)