[
https://issues.apache.org/jira/browse/FLINK-20618?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17251746#comment-17251746
]
Piotr Nowojski commented on FLINK-20618:
----------------------------------------
Thanks for sharing those [~zlzhang0122]. Let me know if Flink 1.12 solves the
problem.
After looking deeper here, I confirm that something is wrong and as you
described initially. One upstream subpartition has hundreds of buffers ready to
be sent, and it doesn't have any credits to do so. Respective down stream
{{RemoteInputChannel}} is empty, and has 0 unannounced credits. More over, as
you stated above, sequence numbers are not matching. Upstream subpartition is 2
buffers ahead - it looks like 2 buffers are missing/were lost in the transfer.
1. One side question, was this query working for you before in some previous
Flink version?
It looks strange and from looking at the code I can not find a reason for such
behaviour. It would be very strange to have such kind of low level bug in the
code base for many releases, without anyone noticing it - network stack bugs
are virtually always very quickly discovered by our automated tests.
2. I noticed you are using some custom source function {{Talos}}. Have you
implemented it correctly? Are you correctly acquiring the checkpoint lock in
every place that's emitting some records? Maybe you could share some code of
this source?
3. Is something interfering with the JVM processes? Are you sure that nobody is
interrupting the JVM? Some kind of script/watchdog/admin? Maybe your {{Talos}}
source is ignoring such interrupts, but they are leaving the internal Flink's
code in undefined state?
> Some of the source operator subtasks will stuck when flink job in critical
> backpressure
> ---------------------------------------------------------------------------------------
>
> Key: FLINK-20618
> URL: https://issues.apache.org/jira/browse/FLINK-20618
> Project: Flink
> Issue Type: Bug
> Components: Runtime / Network
> Affects Versions: 1.10.0, 1.11.1
> Reporter: zlzhang0122
> Priority: Critical
> Attachments: 2020-12-16 11-47-37 的屏幕截图.png, 2020-12-16 11-48-30
> 的屏幕截图.png, 2020-12-16 11-49-01 的屏幕截图.png, 2020-12-16 11-53-42 的屏幕截图.png,
> 2020-12-17 11-10-06 的屏幕截图.png, 2020-12-17 16-45-00 的屏幕截图.png, stuck_node.txt,
> stuck_node_downstream.txt
>
>
> In some critical backpressure situation, some of the subtasks of source will
> blocked to request buffer because of the LocalBufferPool is full,so the whole
> task will be stuck and the other task run well.
> Bellow is the jstack trace:
>
> Legacy Source Thread - Source: TalosTableSource(hash, timestamp, step,
> isCustomize, hostname, endpoint, metric, dsType, orgId, idc, labels, pdl)
> -> SourceConversion(table=[default_catalog.default_database.transfer_c5,
> source: [TalosTableSource(hash, timestamp, step, isCustomize, hostname,
> endpoint, metric, dsType, orgId, idc, labels, pdl)]], fields=[hash,
> timestamp, step, isCustomize, hostname, endpoint, metric, dsType, orgId, idc,
> labels, pdl]) -> Calc(select=[hash, timestamp, step, isCustomize,
> hostname, endpoint, metric, dsType, orgId, idc, labels, pdl, () AS $12,
> (timestamp - ((timestamp + 18000) MOD 86400)) AS $13]) (62/128) #113 prio=5
> os_prio=0 tid=0x00007f43d07e1800 nid=0x1b1c waiting on condition
> [0x00007f43b8488000]
> java.lang.Thread.State: WAITING (parking)
> at sun.misc.Unsafe.park(Native Method)
> - parking to wait for <0x00000000db234488> (a
> java.util.concurrent.CompletableFuture$Signaller)
> at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
> at
> java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1693)
> at java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323)
> at
> java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1729)
> at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)
> at
> org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestMemorySegmentBlocking(LocalBufferPool.java:231)
> at
> org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBuilderBlocking(LocalBufferPool.java:209)
> at
> org.apache.flink.runtime.io.network.partition.ResultPartition.getBufferBuilder(ResultPartition.java:189)
> at
> org.apache.flink.runtime.io.network.api.writer.ChannelSelectorRecordWriter.requestNewBufferBuilder(ChannelSelectorRecordWriter.java:103)
> at
> org.apache.flink.runtime.io.network.api.writer.RecordWriter.copyFromSerializerToTargetChannel(RecordWriter.java:145)
> at
> org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:116)
> at
> org.apache.flink.runtime.io.network.api.writer.ChannelSelectorRecordWriter.emit(ChannelSelectorRecordWriter.java:60)
> at
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:107)
> at
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:89)
> at
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:45)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:732)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:710)
> at StreamExecCalc$33.processElement(Unknown Source)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:641)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:616)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:596)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:732)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:710)
> at SourceConversion$4.processElement(Unknown Source)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:641)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:616)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:596)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:732)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:710)
> at
> org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:104)
> - locked <0x00000000d8d50fa8> (a java.lang.Object)
> at
> org.apache.flink.streaming.connectors.talos.internals.AbstractFetcher.emitRecord(AbstractFetcher.java:379)
> - locked <0x00000000d8d50fa8> (a java.lang.Object)
> at
> org.apache.flink.streaming.connectors.talos.internals.TalosFetcher2.runFetchLoop(TalosFetcher2.java:249)
> at
> org.apache.flink.streaming.connectors.talos.FlinkTalosConsumerBase.run(FlinkTalosConsumerBase.java:758)
> at
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
> at
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
> at
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:196)
>
>
>
> Source: TalosTableSource(hash, timestamp, step, isCustomize, hostname,
> endpoint, metric, dsType, orgId, idc, labels, pdl) ->
> SourceConversion(table=[default_catalog.default_database.transfer_c5, source:
> [TalosTableSource(hash, timestamp, step, isCustomize, hostname, endpoint,
> metric, dsType, orgId, idc, labels, pdl)]], fields=[hash, timestamp, step,
> isCustomize, hostname, endpoint, metric, dsType, orgId, idc, labels, pdl])
> -> Calc(select=[hash, timestamp, step, isCustomize, hostname, endpoint,
> metric, dsType, orgId, idc, labels, pdl, () AS $12, (timestamp - ((timestamp
> + 18000) MOD 86400)) AS $13]) (62/128) #108 prio=5 os_prio=0
> tid=0x00007f44dc178000 nid=0x1332 waiting for monitor entry
> [0x00007f443dfd8000]
> java.lang.Thread.State: BLOCKED (on object monitor)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.run(StreamTaskActionExecutor.java:86)
> - waiting to lock <0x00000000d8d50fa8> (a java.lang.Object)
> at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:78)
> at
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:261)
> at
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:186)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:487)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:470)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532)
> at java.lang.Thread.run(Thread.java:748)
--
This message was sent by Atlassian Jira
(v8.3.4#803005)