zhuzhurk edited a comment on pull request #13628:
URL: https://github.com/apache/flink/pull/13628#issuecomment-708543149


   > Thanks for creating this fix @zhuzhurk. If I understand the problem 
correctly, then I think the PR fixes the problem. However, I have a couple of 
related questions:
   > 
   > 1. `PipelinedRegionSchedulingConcurrentFailureTest. 
testNoImmediateSlotAllocationFailureOnConcurrentFailure` seems to be able to 
reproduce the problem because the `RestartPipelinedRegionFailoverStrategy` also 
cancels not yet started downstream consumers. Why is this the case? Shouldn't 
it only be necessary to cancel `Executions` which are not `CREATED`?
   > 2. `ExecutionGraphToInputsLocationsRetrieverAdapter` seems to solve the 
problem of deadlocking the `MergingSharedSlotProfileRetriever` by returning an 
input future of an execution which is about to be scheduled by checking on the 
`CREATED` state. However, it returns the input location future if the 
`Execution` has a different state. To me this seems an odd responsibility of an 
`InputLocationsRetriever`. I think it would make more sense to filter such 
dependencies out at a different level.
   > 3. Why does `MergingSharedSlotProfileRetriever.getSlotProfileFuture` 
returns a future? At the time of scheduling a pipelined region, shouldn't all 
its inputs and their locations be known?
   > 4. The fix limits the set of input locations to the specified bulk. 
However, the bulk does need to contain all `Executions` which can share the 
slot. Hence, by doing this filtering, we give up potential information about 
other input locations for tasks which can also run in the same slot. Shouldn't 
we try to obtain their input locations and only ignore them if we cannot get 
them?
   > 5. In `PipelinedRegionSchedulingConcurrentFailureTest`, it seems as if 
`Executions` of `v3` and `v4` are in the same `ExecutionSlotSharingGroup` as 
`Executions` of `v1` and `v2`. I think this is also the reason why these 
`ExecutionVertices` are actually asked for their input locations. Given that 
`v2` and `v3` is separated by a blocking data exchange, how can `Executions` of 
these vertices share a slot? Wouldn't it also mean that we are asking for a too 
large slot because 
`SlotSharingExecutionSlotAllocator.getPhysicalSlotResourceProfile` includes 
`v1, v2, v3, v4` in one slot?
   > 
   > If there is no strong need for waiting for a future input location, I 
would say that it would probably be easiest if the 
`MergingSharedSlotProfileRetriever` checks all `Executions` of its 
`ExecutionSlotSharingGroup` and collects the available input locations. Based 
on this information it can then make an approximate decision. That way, it 
should also ignore currently failed tasks. Moreover, we should check how the 
`ExecutionSlotSharingGroups` are calculated. There might be small inaccuracy in 
it.
   > 
   > cc @azagrebin
   
   Thanks for the thoughtful comments. @tillrohrmann 
   I agree that we can simply `MergingSharedSlotProfileRetriever` to directly 
return `SlotProfile` instead of a future by just collecting available input 
locations. This should be enough for pipelined region scheduling which 
schedules a region only if all its inputs regions have finished. 
   Regarding the inaccuracy of `ExecutionSlotSharingGroups`, does `#5` or the 
other comment answers your question?
   
   Below are replies to each listed question:
   1. This is unwanted side effect of restarting all downstream tasks of a 
failed task to avoid indeterministic issues. I agree we should skip canceling 
tasks in CREATED. This can reduce a lot of unnecessary canceling logs, 
especially for large scale batch jobs.
   2. Agreed it's odd. But given that EAGER/LAZY scheduling which will be kept 
for a while, and location future is still needed for them, maybe we can leave 
it as is. While for pipelined region scheduling, we can introduce 
`PreferredLocationsRetrieverV2` and `InputsLocationsRetrieverV2` which directly 
returns locations instead of futures? Only available locations will be 
considered in this case.
   3. If considering the whole `ExecutionSlotSharingGroup`, some vertices may 
have not been scheduled so their inputs may not be decided. However, we can 
change it to just consider the available locations at that time, like mentioned 
in `#2`
   4. Yes you are right. It would be better to take those un-scheduled vertices 
into consideration when calculating preferred input locations.
   5. The case mocks a `DataSet` job. Currently, job vertices of a `DataSet` 
job will be put into the same `SlotSharingGroup`. I think we need to change it 
to assign each logical pipelined region a different slot sharing group, 
similarly like how blink planner batch jobs do. This is something we have 
missed after making pipelined region scheduling for all kinds of jobs.
   
   In summary, here are the actions to take:
   1. Fix FLINK-19552 in a different way. i.e. introducing 
`PreferredLocationsRetrieverV2` and `InputsLocationsRetrieverV2` which directly 
returns locations by only available ones
   2. Change default `SlotSharingGroup` setting for DataSet jobs by assigning 
each logical pipelined region a different slot sharing group
   3. Change region failover to skip restarting CREATED tasks
   
   WDYT?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to