Weijie Guo created FLINK-31330:
----------------------------------
Summary: Batch shuffle may deadlock for operator with priority
input
Key: FLINK-31330
URL: https://issues.apache.org/jira/browse/FLINK-31330
Project: Flink
Issue Type: Technical Debt
Components: Runtime / Network
Affects Versions: 1.16.1
Reporter: Weijie Guo
Assignee: Weijie Guo
For batch job, some operator's input have priority. For example, hash join
operator has two inputs called {{build}} and {{probe}} respectively. Only after
the build input is finished can the probe input start consuming. Unfortunately,
the priority of input will not affect multiple inputs to request upstream
data(i.e. request partition). In current implementation, when all states are
restored, inputGate will start to request partition. This will enable the
upstream {{IO scheduler}} to register readers for all downstream channels, so
there is the possibility of deadlock.
Assume that the build and probe input's upstream tasks of hash join are
deployed in the same TM. Then the corresponding readers will be registered to
an single {{IO scheduler}}, and they share the same
{{BatchShuffleReadBufferPool}}. If the IO thread happens to load too many
buffers for the probe reader, but the downstream will not consume the data,
which will cause the build reader to be unable to request enough buffers.
Therefore, deadlock occurs.
In fact, we realized this problem at the beginning of the design of
{{SortMergeShuffle}}, so we introduced a timeout mechanism when requesting read
buffers. If this happens, the downstream task will trigger failover to avoid
permanent blocking. However, under the default configuration, TPC-DS test with
10T data can easily cause the job to fail because of this reason. It seems that
this problem needs to be solved more better.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)