[ https://issues.apache.org/jira/browse/FLINK-10205?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16653673#comment-16653673 ]
ASF GitHub Bot commented on FLINK-10205: ---------------------------------------- tillrohrmann commented on issue #6684: [FLINK-10205] Batch Job: InputSplit Fault tolerant for DataSource… URL: https://github.com/apache/flink/pull/6684#issuecomment-430661796 Alright, now the problem is a bit clearer to me. The underlying problem is that the `InputSplitAssigner's` semantics in case of a failover are not well defined. This is mainly due to the fact that Flink evolved over time. The general idea of the `InputSplitAssigner` is to lazily assign work to sources which have completely consumed their current `InputSplit`. The order in which this happens should not affect the correctness of the result. If you say that in case of a recovery the exact same `InputSplit` assignment needs to happen again, then I think it must be because our sources have some kind of state. Otherwise, it should not matter which source task completes the `InputSplit`, right? If this is correct, then we would run into the same problem if a JM failure happens, because we would lose all `InputSplit` assignment information which is stored on the JM. So stateful sources with `InputSplits` don't work at the moment (in the general case). If we assume that our sources are stateless, then simply returning the input splits to the assigner and letting the next idling task take it should work. In your example of the infinite stream which is initialized via the `InputSplits` there would be no other task competing for the `InputSplit` of a failed task because by definition they never finish their work, right? If multiple tasks fail, then the mapping might be different after the recovery, but every task would continue consuming from a single `InputSplit`. I think the problem here is that you abused the `InputSplitAssigner` for something it is not yet intended to do. The reason why I'm a bit hesitant here is because I think we do not fully understand yet what we actually want to have. Moreover, some corner cases not clear to me yet. For example, why would it be ok for a global failover to change the mapping and not for region failover? Another example is how to handle the case where we lose a TM and need to downscale. Would that effectively be a global failover where we redistribute all `InputSplits` (I would think so). Before starting any concrete implementation steps, I think we should properly design this feature to get it right. A very related topic is actually the new source interface. Depending on how much we are able to unify batch and streaming, the whole `InputSplit` assignment might move into a single task (similar to the `ContinuousFileMonitoringSink`) and the assignment might become part of a checkpoint. That way, we would no longer need to take care of this on the JM side. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Batch Job: InputSplit Fault tolerant for DataSourceTask > ------------------------------------------------------- > > Key: FLINK-10205 > URL: https://issues.apache.org/jira/browse/FLINK-10205 > Project: Flink > Issue Type: Sub-task > Components: JobManager > Affects Versions: 1.6.1, 1.6.2, 1.7.0 > Reporter: JIN SUN > Assignee: JIN SUN > Priority: Major > Labels: pull-request-available > Fix For: 1.7.0 > > Original Estimate: 168h > Remaining Estimate: 168h > > Today DataSource Task pull InputSplits from JobManager to achieve better > performance, however, when a DataSourceTask failed and rerun, it will not get > the same splits as its previous version. this will introduce inconsistent > result or even data corruption. > Furthermore, if there are two executions run at the same time (in batch > scenario), this two executions should process same splits. > we need to fix the issue to make the inputs of a DataSourceTask > deterministic. The propose is save all splits into ExecutionVertex and > DataSourceTask will pull split from there. > document: > [https://docs.google.com/document/d/1FdZdcA63tPUEewcCimTFy9Iz2jlVlMRANZkO4RngIuk/edit?usp=sharing] -- This message was sent by Atlassian JIRA (v7.6.3#76005)