[ 
https://issues.apache.org/jira/browse/FLINK-20547?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17305878#comment-17305878
 ] 

Yingjie Cao commented on FLINK-20547:
-------------------------------------

[~pnowojski] After reading the code, I think I have find the bug. The 
checkAvailability method will allocate buffer from the global, I may return 
false even when it already allocate some buffers and the available future will 
be set as unavailable which leads to the consistency check fail.

Things happen in the following order can lead to the error:
 # The setNumBuffers method is called.
 # The checkAvailability method is called.
 # The requestMemorySegmentFromGlobal is called and no buffer is returned which 
means the global pool does not have any available buffers.
 # The requestMemorySegmentFromGlobalWhenAvailable method is called and the 
global pool now has available buffers and the registered call back is called 
directly, after which new buffers are allocated and the buffer pool becomes 
available.
 # However, the checkAvailability call in 2 returns false and the else branch 
in setNumBuffers is ran, the buffer pool is set unavailable and the 
inconsistency happens.

Could you please verify if I am right? If I am right, I can prepare a fix PR 
soon, but the test is hard to simulate the scenario. I wonder if we can fix it 
without test coverage (I can verify the fix with the 8000 * 8000 real world 
job).

> Batch job fails due to the exception in network stack
> -----------------------------------------------------
>
>                 Key: FLINK-20547
>                 URL: https://issues.apache.org/jira/browse/FLINK-20547
>             Project: Flink
>          Issue Type: Bug
>          Components: Runtime / Network
>    Affects Versions: 1.13.0
>            Reporter: Zhilong Hong
>            Assignee: Yingjie Cao
>            Priority: Major
>              Labels: test-stability
>             Fix For: 1.13.0
>
>         Attachments: inconsistent.tar.gz
>
>
> I run a simple batch job with only two job vertices: a source and a sink.
> The parallelisms of them are both 8000. They are connected via all-to-all 
> blocking edges.
> During the running of sink tasks, an exception raises:
> {code:java}
> 2020-12-09 18:43:48,981 INFO  
> org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Sink: Sink 
> 1 (1595/8000) (08bd4214d6e0dc144e9654f1faaa3b28) switched from RUNNING to 
> FAILED on [masked container name] @ [masked address] (dataPort=47872).
> java.io.IOException: java.lang.IllegalStateException: Inconsistent 
> availability: expected true
>       at 
> org.apache.flink.runtime.io.network.partition.consumer.InputChannel.checkError(InputChannel.java:232)
>  ~[flink-dist_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT]
>       at 
> org.apache.flink.runtime.io.network.partition.consumer.RecoveredInputChannel.getNextBuffer(RecoveredInputChannel.java:165)
>  ~[flink-dist_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT]
>       at 
> org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.waitAndGetNextData(SingleInputGate.java:626)
>  ~[flink-dist_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT]
>       at 
> org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:603)
>  ~[flink-dist_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT]
>       at 
> org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.pollNext(SingleInputGate.java:591)
>  ~[flink-dist_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT]
>       at 
> org.apache.flink.runtime.taskmanager.InputGateWithMetrics.pollNext(InputGateWithMetrics.java:109)
>  ~[flink-dist_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT]
>       at 
> org.apache.flink.streaming.runtime.io.CheckpointedInputGate.pollNext(CheckpointedInputGate.java:142)
>  ~[flink-dist_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT]
>       at 
> org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:157)
>  ~[flink-dist_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT]
>       at 
> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67)
>  ~[flink-dist_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT]
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:372)
>  ~[flink-dist_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT]
>       at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:186)
>  ~[flink-dist_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT]
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:575)
>  ~[flink-dist_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT]
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:539)
>  ~[flink-dist_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT]
>       at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722) 
> ~[flink-dist_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT]
>       at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547) 
> ~[flink-dist_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT]
>       at java.lang.Thread.run(Thread.java:834) ~[?:1.8.0_102]
> Caused by: java.lang.IllegalStateException: Inconsistent availability: 
> expected true
>       at 
> org.apache.flink.util.Preconditions.checkState(Preconditions.java:198) 
> ~[flink-dist_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT]
>       at 
> org.apache.flink.runtime.io.network.buffer.LocalBufferPool.checkConsistentAvailability(LocalBufferPool.java:434)
>  ~[flink-dist_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT]
>       at 
> org.apache.flink.runtime.io.network.buffer.LocalBufferPool.setNumBuffers(LocalBufferPool.java:564)
>  ~[flink-dist_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT]
>       at 
> org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.redistributeBuffers(NetworkBufferPool.java:509)
>  ~[flink-dist_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT]
>       at 
> org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.tryRedistributeBuffers(NetworkBufferPool.java:438)
>  ~[flink-dist_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT]
>       at 
> org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.requestMemorySegments(NetworkBufferPool.java:166)
>  ~[flink-dist_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT]
>       at 
> org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.requestMemorySegments(NetworkBufferPool.java:60)
>  ~[flink-dist_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT]
>       at 
> org.apache.flink.runtime.io.network.partition.consumer.BufferManager.requestExclusiveBuffers(BufferManager.java:131)
>  ~[flink-dist_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT]
>       at 
> org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.setup(RemoteInputChannel.java:148)
>  ~[flink-dist_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT]
>       at 
> org.apache.flink.runtime.io.network.partition.consumer.RemoteRecoveredInputChannel.toInputChannelInternal(RemoteRecoveredInputChannel.java:76)
>  ~[flink-dist_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT]
>       at 
> org.apache.flink.runtime.io.network.partition.consumer.RecoveredInputChannel.toInputChannel(RecoveredInputChannel.java:91)
>  ~[flink-dist_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT]
>       at 
> org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.convertRecoveredInputChannels(SingleInputGate.java:299)
>  ~[flink-dist_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT]
>       at 
> org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.requestPartitions(SingleInputGate.java:285)
>  ~[flink-dist_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT]
>       at 
> org.apache.flink.runtime.taskmanager.InputGateWithMetrics.requestPartitions(InputGateWithMetrics.java:94)
>  ~[flink-dist_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT]
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:47)
>  ~[flink-dist_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT]
>       at 
> org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:78) 
> ~[flink-dist_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT]
>       at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:283)
>  ~[flink-dist_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT]
>       at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:184)
>  ~[flink-dist_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT]
>       ... 5 more
> {code}
>  It seems to be an exception in network stack.
> The full log of the job is attached below.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to