[
https://issues.apache.org/jira/browse/FLINK-20618?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17250928#comment-17250928
]
Piotr Nowojski commented on FLINK-20618:
----------------------------------------
Thanks for the quick response.
1. What Flink version are you using exactly or what versions have you tried? Is
it official unmodified Flink build?
2. Could you share a couple of stack traces (taken one after another), from the
Task's thread of the downstream Task, the one that is supposed to be reading
from this subpartition? Just to make sure it's indeed making progress and what
is he doing.
3a. Could you maybe share the memory dump of the downstream task (which was
supposed to be reading from this blocked subtask) and maybe also the blocked
subtask as well?
3b. If not could you at least check what's the state of the
{{RemoteInputChannel}}, {{SingleInputGate}} (and potentially
{{UnionInputGate}}) of the downstream task? For example on one of the
screenshots, with expected sequence number {{330959}}, you could see
{{ArrayDequeue receivedBuffers}}. Is it empty? If not, is this
{{RemoteInputChannel}} enqueued in {{SingleInputGate#inputChannelsWithData}}
and {{UnionInputGate#inputGatesWithData}}?
> Some of the source operator subtasks will stuck when flink job in critical
> backpressure
> ---------------------------------------------------------------------------------------
>
> Key: FLINK-20618
> URL: https://issues.apache.org/jira/browse/FLINK-20618
> Project: Flink
> Issue Type: Bug
> Components: Runtime / Network
> Affects Versions: 1.11.0, 1.10.2
> Reporter: zlzhang0122
> Priority: Critical
> Attachments: 2020-12-16 11-47-37 的屏幕截图.png, 2020-12-16 11-48-30
> 的屏幕截图.png, 2020-12-16 11-49-01 的屏幕截图.png, 2020-12-16 11-53-42 的屏幕截图.png,
> 2020-12-17 11-10-06 的屏幕截图.png, 2020-12-17 16-45-00 的屏幕截图.png
>
>
> In some critical backpressure situation, some of the subtasks of source will
> blocked to request buffer because of the LocalBufferPool is full,so the whole
> task will be stuck and the other task run well.
> Bellow is the jstack trace:
>
> Legacy Source Thread - Source: TalosTableSource(hash, timestamp, step,
> isCustomize, hostname, endpoint, metric, dsType, orgId, idc, labels, pdl)
> -> SourceConversion(table=[default_catalog.default_database.transfer_c5,
> source: [TalosTableSource(hash, timestamp, step, isCustomize, hostname,
> endpoint, metric, dsType, orgId, idc, labels, pdl)]], fields=[hash,
> timestamp, step, isCustomize, hostname, endpoint, metric, dsType, orgId, idc,
> labels, pdl]) -> Calc(select=[hash, timestamp, step, isCustomize,
> hostname, endpoint, metric, dsType, orgId, idc, labels, pdl, () AS $12,
> (timestamp - ((timestamp + 18000) MOD 86400)) AS $13]) (62/128) #113 prio=5
> os_prio=0 tid=0x00007f43d07e1800 nid=0x1b1c waiting on condition
> [0x00007f43b8488000]
> java.lang.Thread.State: WAITING (parking)
> at sun.misc.Unsafe.park(Native Method)
> - parking to wait for <0x00000000db234488> (a
> java.util.concurrent.CompletableFuture$Signaller)
> at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
> at
> java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1693)
> at java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323)
> at
> java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1729)
> at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)
> at
> org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestMemorySegmentBlocking(LocalBufferPool.java:231)
> at
> org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBuilderBlocking(LocalBufferPool.java:209)
> at
> org.apache.flink.runtime.io.network.partition.ResultPartition.getBufferBuilder(ResultPartition.java:189)
> at
> org.apache.flink.runtime.io.network.api.writer.ChannelSelectorRecordWriter.requestNewBufferBuilder(ChannelSelectorRecordWriter.java:103)
> at
> org.apache.flink.runtime.io.network.api.writer.RecordWriter.copyFromSerializerToTargetChannel(RecordWriter.java:145)
> at
> org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:116)
> at
> org.apache.flink.runtime.io.network.api.writer.ChannelSelectorRecordWriter.emit(ChannelSelectorRecordWriter.java:60)
> at
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:107)
> at
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:89)
> at
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:45)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:732)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:710)
> at StreamExecCalc$33.processElement(Unknown Source)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:641)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:616)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:596)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:732)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:710)
> at SourceConversion$4.processElement(Unknown Source)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:641)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:616)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:596)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:732)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:710)
> at
> org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:104)
> - locked <0x00000000d8d50fa8> (a java.lang.Object)
> at
> org.apache.flink.streaming.connectors.talos.internals.AbstractFetcher.emitRecord(AbstractFetcher.java:379)
> - locked <0x00000000d8d50fa8> (a java.lang.Object)
> at
> org.apache.flink.streaming.connectors.talos.internals.TalosFetcher2.runFetchLoop(TalosFetcher2.java:249)
> at
> org.apache.flink.streaming.connectors.talos.FlinkTalosConsumerBase.run(FlinkTalosConsumerBase.java:758)
> at
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
> at
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
> at
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:196)
>
>
>
> Source: TalosTableSource(hash, timestamp, step, isCustomize, hostname,
> endpoint, metric, dsType, orgId, idc, labels, pdl) ->
> SourceConversion(table=[default_catalog.default_database.transfer_c5, source:
> [TalosTableSource(hash, timestamp, step, isCustomize, hostname, endpoint,
> metric, dsType, orgId, idc, labels, pdl)]], fields=[hash, timestamp, step,
> isCustomize, hostname, endpoint, metric, dsType, orgId, idc, labels, pdl])
> -> Calc(select=[hash, timestamp, step, isCustomize, hostname, endpoint,
> metric, dsType, orgId, idc, labels, pdl, () AS $12, (timestamp - ((timestamp
> + 18000) MOD 86400)) AS $13]) (62/128) #108 prio=5 os_prio=0
> tid=0x00007f44dc178000 nid=0x1332 waiting for monitor entry
> [0x00007f443dfd8000]
> java.lang.Thread.State: BLOCKED (on object monitor)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.run(StreamTaskActionExecutor.java:86)
> - waiting to lock <0x00000000d8d50fa8> (a java.lang.Object)
> at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:78)
> at
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:261)
> at
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:186)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:487)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:470)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532)
> at java.lang.Thread.run(Thread.java:748)
--
This message was sent by Atlassian Jira
(v8.3.4#803005)