[
https://issues.apache.org/jira/browse/FLINK-14163?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17011568#comment-17011568
]
Yuan Mei edited comment on FLINK-14163 at 1/9/20 9:34 AM:
----------------------------------------------------------
Thanks for assigning the task to me!
I have written a first version based on the discussion, key changes:
# change Execution#producedPartitions to Execution#producedPartitionsFuture,
and initiate it as an incomplete future
# assign the producedPartitionsFuture in Execution#registerProducedPartitions
# wrap any access to producedPartitions in a synchronous function. If later
async registration is needed, callbacks can be added to substitute this method.
The function is quite simple: fail if the producedPartitionsFuture is not done,
otherwise return producedPartitionsFuture.get()
Code Link:
[https://github.com/apache/flink/compare/master...curcur:shuffle_master_async_interface?expand=1]
This works fine if producedPartitions are not supposed to be accessed without
registration, which is natural since we have to assign before using. Notice
that `registration` and `registration finished` is different. The former refers
to whether registration is always required
(Execution#registerProducedPartitions is always called before accessed), and
the latter refers to whether partitions are successfully registered.
However, I find a lot of the unit tests fail because producedPartitions are
accessed without Execution#registerProducedPartitions are called, for example
ExecutionVertexDeploymentTest#testDeployCall()
ExecutionGraphCheckpointCoordinatorTest#testShutdownCheckpointCoordinatorOnFailure()
e.t.c.
In the old version, producedPartitions is initiated as an empty map, and works
well in cases the real value of producedPartitions are not necessary.
I am wondering whether this is just a shortcut for tests or it is also
used/allowed in some places in prod path?
If access without registration is possible in prod, we can make
producedPartitionsFuture Optional to differentiate whether
Execution#registerProducedPartitions is called or not, like this:
[https://github.com/apache/flink/compare/master...curcur:optional?expand=1]
Or a safer and simpler change is to keep all interfaces and usages as it is,
and directly check isDone() and call get() after registration of
producedPartitions, like this:
[https://github.com/apache/flink/compare/master...curcur:simpler_way?expand=1]
was (Author: ym):
Thanks for assigning the task to me!
I have written a first version based on the discussion, key changes:
# change Execution#producedPartitions to Execution#producedPartitionsFuture,
and initiate it as an incomplete future
# assign the producedPartitionsFuture in Execution#registerProducedPartitions
# wrap any access to producedPartitions in a synchronous function. If later
async registration is needed, callbacks can be added to substitute this method.
The function is quite simple: fail if the producedPartitionsFuture is not done,
otherwise return producedPartitionsFuture.get()
Code Link:
[https://github.com/apache/flink/compare/master...curcur:shuffle_master_async_interface?expand=1]
This works fine if producedPartitions are not supposed to be accessed without
registration, which is natural since we have to assign before using. Notice
that `registration` and `registration finished` is different. The former refers
to whether registration is always required
(Execution#registerProducedPartitions is always called before accessed), and
the latter refers to whether partitions are successfully registered.
However, I find a lot of the unit tests fail because producedPartitions are
accessed without Execution#registerProducedPartitions are called, for example
ExecutionVertexDeploymentTest#testDeployCall()
ExecutionGraphCheckpointCoordinatorTest#testShutdownCheckpointCoordinatorOnFailure()
e.t.c.
In the old version, producedPartitions is initiated as an empty map, and works
well in cases the real value of producedPartitions are not necessary.
I am wondering whether this is just a shortcut for tests or it is also
used/allowed in some places in prod path?
If access without registration is possible in prod, we can make
producedPartitionsFuture Optional to differentiate whether
Execution#registerProducedPartitions is called or not.
Or a safer and simpler change is to keep all interfaces and usages as it is,
and directly check isDone() and call get() after registration of
producedPartitions, like this:
[https://github.com/apache/flink/compare/master...curcur:simpler_way?expand=1]
> Execution#producedPartitions is possibly not assigned when used
> ---------------------------------------------------------------
>
> Key: FLINK-14163
> URL: https://issues.apache.org/jira/browse/FLINK-14163
> Project: Flink
> Issue Type: Bug
> Components: Runtime / Coordination
> Affects Versions: 1.9.0, 1.10.0
> Reporter: Zhu Zhu
> Assignee: Yuan Mei
> Priority: Major
> Fix For: 1.10.0
>
>
> Currently {{Execution#producedPartitions}} is assigned after the partitions
> have completed the registration to shuffle master in
> {{Execution#registerProducedPartitions(...)}}.
> The partition registration is an async interface
> ({{ShuffleMaster#registerPartitionWithProducer(...)}}), so
> {{Execution#producedPartitions}} is possible[1] not set when used.
> Usages includes:
> 1. deploying this task, so that the task may be deployed without its result
> partitions assigned, and the job would hang. (DefaultScheduler issue only,
> since legacy scheduler handled this case)
> 2. generating input descriptors for downstream tasks:
> 3. retrieve {{ResultPartitionID}} for partition releasing:
> [1] If a user uses Flink default shuffle master {{NettyShuffleMaster}}, it is
> not problematic at the moment since it returns a completed future on
> registration, so that it would be a synchronized process. However, if users
> implement their own shuffle service in which the
> {{ShuffleMaster#registerPartitionWithProducer}} returns an pending future, it
> can be a problem. This is possible since customizable shuffle service is open
> to users since 1.9 (via config "shuffle-service-factory.class").
> To avoid issues to happen, we may either
> 1. fix all the usages of {{Execution#producedPartitions}} regarding the async
> assigning, or
> 2. change {{ShuffleMaster#registerPartitionWithProducer(...)}} to a sync
> interface
--
This message was sent by Atlassian Jira
(v8.3.4#803005)