[
https://issues.apache.org/jira/browse/FLINK-20618?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
zlzhang0122 updated FLINK-20618:
--------------------------------
Attachment: stuck_node_downstream.txt
> Some of the source operator subtasks will stuck when flink job in critical
> backpressure
> ---------------------------------------------------------------------------------------
>
> Key: FLINK-20618
> URL: https://issues.apache.org/jira/browse/FLINK-20618
> Project: Flink
> Issue Type: Bug
> Components: Runtime / Network
> Affects Versions: 1.11.0, 1.10.2
> Reporter: zlzhang0122
> Priority: Critical
> Attachments: 2020-12-16 11-47-37 的屏幕截图.png, 2020-12-16 11-48-30
> 的屏幕截图.png, 2020-12-16 11-49-01 的屏幕截图.png, 2020-12-16 11-53-42 的屏幕截图.png,
> 2020-12-17 11-10-06 的屏幕截图.png, 2020-12-17 16-45-00 的屏幕截图.png, stuck_node.txt,
> stuck_node_downstream.txt
>
>
> In some critical backpressure situation, some of the subtasks of source will
> blocked to request buffer because of the LocalBufferPool is full,so the whole
> task will be stuck and the other task run well.
> Bellow is the jstack trace:
>
> Legacy Source Thread - Source: TalosTableSource(hash, timestamp, step,
> isCustomize, hostname, endpoint, metric, dsType, orgId, idc, labels, pdl)
> -> SourceConversion(table=[default_catalog.default_database.transfer_c5,
> source: [TalosTableSource(hash, timestamp, step, isCustomize, hostname,
> endpoint, metric, dsType, orgId, idc, labels, pdl)]], fields=[hash,
> timestamp, step, isCustomize, hostname, endpoint, metric, dsType, orgId, idc,
> labels, pdl]) -> Calc(select=[hash, timestamp, step, isCustomize,
> hostname, endpoint, metric, dsType, orgId, idc, labels, pdl, () AS $12,
> (timestamp - ((timestamp + 18000) MOD 86400)) AS $13]) (62/128) #113 prio=5
> os_prio=0 tid=0x00007f43d07e1800 nid=0x1b1c waiting on condition
> [0x00007f43b8488000]
> java.lang.Thread.State: WAITING (parking)
> at sun.misc.Unsafe.park(Native Method)
> - parking to wait for <0x00000000db234488> (a
> java.util.concurrent.CompletableFuture$Signaller)
> at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
> at
> java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1693)
> at java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323)
> at
> java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1729)
> at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)
> at
> org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestMemorySegmentBlocking(LocalBufferPool.java:231)
> at
> org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBuilderBlocking(LocalBufferPool.java:209)
> at
> org.apache.flink.runtime.io.network.partition.ResultPartition.getBufferBuilder(ResultPartition.java:189)
> at
> org.apache.flink.runtime.io.network.api.writer.ChannelSelectorRecordWriter.requestNewBufferBuilder(ChannelSelectorRecordWriter.java:103)
> at
> org.apache.flink.runtime.io.network.api.writer.RecordWriter.copyFromSerializerToTargetChannel(RecordWriter.java:145)
> at
> org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:116)
> at
> org.apache.flink.runtime.io.network.api.writer.ChannelSelectorRecordWriter.emit(ChannelSelectorRecordWriter.java:60)
> at
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:107)
> at
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:89)
> at
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:45)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:732)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:710)
> at StreamExecCalc$33.processElement(Unknown Source)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:641)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:616)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:596)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:732)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:710)
> at SourceConversion$4.processElement(Unknown Source)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:641)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:616)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:596)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:732)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:710)
> at
> org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:104)
> - locked <0x00000000d8d50fa8> (a java.lang.Object)
> at
> org.apache.flink.streaming.connectors.talos.internals.AbstractFetcher.emitRecord(AbstractFetcher.java:379)
> - locked <0x00000000d8d50fa8> (a java.lang.Object)
> at
> org.apache.flink.streaming.connectors.talos.internals.TalosFetcher2.runFetchLoop(TalosFetcher2.java:249)
> at
> org.apache.flink.streaming.connectors.talos.FlinkTalosConsumerBase.run(FlinkTalosConsumerBase.java:758)
> at
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
> at
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
> at
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:196)
>
>
>
> Source: TalosTableSource(hash, timestamp, step, isCustomize, hostname,
> endpoint, metric, dsType, orgId, idc, labels, pdl) ->
> SourceConversion(table=[default_catalog.default_database.transfer_c5, source:
> [TalosTableSource(hash, timestamp, step, isCustomize, hostname, endpoint,
> metric, dsType, orgId, idc, labels, pdl)]], fields=[hash, timestamp, step,
> isCustomize, hostname, endpoint, metric, dsType, orgId, idc, labels, pdl])
> -> Calc(select=[hash, timestamp, step, isCustomize, hostname, endpoint,
> metric, dsType, orgId, idc, labels, pdl, () AS $12, (timestamp - ((timestamp
> + 18000) MOD 86400)) AS $13]) (62/128) #108 prio=5 os_prio=0
> tid=0x00007f44dc178000 nid=0x1332 waiting for monitor entry
> [0x00007f443dfd8000]
> java.lang.Thread.State: BLOCKED (on object monitor)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.run(StreamTaskActionExecutor.java:86)
> - waiting to lock <0x00000000d8d50fa8> (a java.lang.Object)
> at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:78)
> at
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:261)
> at
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:186)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:487)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:470)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532)
> at java.lang.Thread.run(Thread.java:748)
--
This message was sent by Atlassian Jira
(v8.3.4#803005)