[
https://issues.apache.org/jira/browse/FLINK-19693?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17225464#comment-17225464
]
Jin Xing commented on FLINK-19693:
----------------------------------
Hi, Yuan ~
I just learnt this series of JIRA & PRs, really good feature and thanks a lot
for deep explanation and analysis in JIRAs and PRs, really helpful for
understanding !
After "FLIP-31: Pluggable Shuffle Service", users can extend and plug in new
shuffle manner, thus to benefit different scenarios. New shuffle manner tend to
bring in new abilities (e.g. the approximate local recovering ability by
PipelinedApproximateSubpartition and PipelinedApproximateSubpartitionView),
which could be exploited by scheduling layer (like scheduler, lifecycle
management and so on) to provide better performance.
>From my understanding, the characteristics of shuffle manner is exposed by
>ResultPartitionType (e.g. isPipelined, isBlocking, hasBackPressure ...), and
>exploited by scheduling layer to conduct job. But seems that Flink doesn't
>provide a way to describe the new characteristics from a plugged in shuffle
>manner. I also find that scheduling layer have some weak assumptions on
>ResultPartitionType. I detail by below example.
In our internal Flink, we develop a new shuffle manner for batch jobs.
Characteristics can be summarized as below briefly:
1. Upstream task shuffle writes data to DISK;
2. Upstream task commits data while producing and notify "consumable" to
downstream BEFORE task finished;
3. Downstream is notified when upstream data is consumable and can be scheduled
according to resource;
4. When downstream task failover, only itself needs to be restarted because
upstream data is written into disk and replayable;
We can tell the character of this new shuffle manner as:
a. *_isPipelined=true_* -- downstream task can consume data before upstream
finished;
b. _*hasBackPressure=false*_ -- upstream task shuffle writes data to disk and
can finish by itself no matter if there's downstream task consumes the data in
time.
But above new _*ResultPartitionType(isPipelined=true, hasBackPressure=false)*_
seems contradicts current scheduling layer in several places:
1. The above new shuffle manner needs partition tracker for lifecycle
management, but current Flink assumes that ONLY blocking-shuffle supports to be
consumed multiple times and an *_"isPipelined=true"_* shuffle will not be taken
care of by partition tracker
([link|https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/JobMasterPartitionTrackerImpl.java#L66])
-- the limitation is not correct for this case.
2. In current Flink, slot-sharing-group is bound with pipelined-region, two
vertices are divided into the same pipelined-region & slot-sharing-group if
they are connected by an _*"isPipelined=true"*_ edge
([link|https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/PipelinedRegionComputeUtil.java#L73]).
But with above new shuffle manner, tasks are scheduled according to resource,
upstream and downstream are not guaranteed to schedule at the same time, and
have no request to be divided into a same pipelined-region or
slot-sharing-group.
I think it might need more long discussions to allow pluggable shuffle service
to define their own "ResultPartitionType", but shall we sort out and take some
easy fix for above week assumptions
if the concerns are valid ?
Also cc [~trohrmann] and [~zhuzh] Flink experts for shepherd.
> Scheduler Change for Approximate Local Recovery to Restart Downstream of a
> Failed Task
> --------------------------------------------------------------------------------------
>
> Key: FLINK-19693
> URL: https://issues.apache.org/jira/browse/FLINK-19693
> Project: Flink
> Issue Type: Sub-task
> Components: Runtime / Coordination
> Reporter: Yuan Mei
> Assignee: Yuan Mei
> Priority: Major
> Labels: pull-request-available
>
> Enables downstream failover for approximate local recovery.
> That says if a task fails, all its downstream tasks restart, including
> itself. This is achieved by reusing the existing
> {{RestartPipelinedRegionFailoverStrategy}} --- treat each individual task
> connected by ResultPartition.Pipelined_Approximate as a separate region.
>
> It introduces an attribute "reconnectable" in ResultPartitionType to indicate
> whether the partition is reconnectable. Notice that this is only a temporary
> solution for now. It will be removed after:
> # Approximate local recovery has its won failover strategy to restart the
> failed set of tasks instead of restarting downstream of failed tasks
> depending on {[@link|https://github.com/code]
> RestartPipelinedRegionFailoverStrategy}
> # FLINK-19895: Unify the life cycle of ResultPartitionType Pipelined Family.
> There is also a good discussion on this in FLINK-19632.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)