[ 
https://issues.apache.org/jira/browse/FLINK-10205?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16653673#comment-16653673
 ] 

ASF GitHub Bot commented on FLINK-10205:
----------------------------------------

tillrohrmann commented on issue #6684:     [FLINK-10205] Batch Job: InputSplit 
Fault tolerant for DataSource…
URL: https://github.com/apache/flink/pull/6684#issuecomment-430661796
 
 
   Alright, now the problem is a bit clearer to me. The underlying problem is 
that the `InputSplitAssigner's` semantics in case of a failover are not well 
defined. This is mainly due to the fact that Flink evolved over time.
   
   The general idea of the `InputSplitAssigner` is to lazily assign work to 
sources which have completely consumed their current `InputSplit`. The order in 
which this happens should not affect the correctness of the result.
   
   If you say that in case of a recovery the exact same `InputSplit` assignment 
needs to happen again, then I think it must be because our sources have some 
kind of state. Otherwise, it should not matter which source task completes the 
`InputSplit`, right? If this is correct, then we would run into the same 
problem if a JM failure happens, because we would lose all `InputSplit` 
assignment information which is stored on the JM. So stateful sources with 
`InputSplits` don't work at the moment (in the general case).
   
   If we assume that our sources are stateless, then simply returning the input 
splits to the assigner and letting the next idling task take it should work. In 
your example of the infinite stream which is initialized via the `InputSplits` 
there would be no other task competing for the `InputSplit` of a failed task 
because by definition they never finish their work, right? If multiple tasks 
fail, then the mapping might be different after the recovery, but every task 
would continue consuming from a single `InputSplit`. I think the problem here 
is that you abused the `InputSplitAssigner` for something it is not yet 
intended to do.
   
   The reason why I'm a bit hesitant here is because I think we do not fully 
understand yet what we actually want to have. Moreover, some corner cases not 
clear to me yet. For example, why would it be ok for a global failover to 
change the mapping and not for region failover? Another example is how to 
handle the case where we lose a TM and need to downscale. Would that 
effectively be a global failover where we redistribute all `InputSplits` (I 
would think so). 
   
   Before starting any concrete implementation steps, I think we should 
properly design this feature to get it right. A very related topic is actually 
the new source interface. Depending on how much we are able to unify batch and 
streaming, the whole `InputSplit` assignment might move into a single task 
(similar to the `ContinuousFileMonitoringSink`) and the assignment might become 
part of a checkpoint. That way, we would no longer need to take care of this on 
the JM side.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Batch Job: InputSplit Fault tolerant for DataSourceTask
> -------------------------------------------------------
>
>                 Key: FLINK-10205
>                 URL: https://issues.apache.org/jira/browse/FLINK-10205
>             Project: Flink
>          Issue Type: Sub-task
>          Components: JobManager
>    Affects Versions: 1.6.1, 1.6.2, 1.7.0
>            Reporter: JIN SUN
>            Assignee: JIN SUN
>            Priority: Major
>              Labels: pull-request-available
>             Fix For: 1.7.0
>
>   Original Estimate: 168h
>  Remaining Estimate: 168h
>
> Today DataSource Task pull InputSplits from JobManager to achieve better 
> performance, however, when a DataSourceTask failed and rerun, it will not get 
> the same splits as its previous version. this will introduce inconsistent 
> result or even data corruption.
> Furthermore,  if there are two executions run at the same time (in batch 
> scenario), this two executions should process same splits.
> we need to fix the issue to make the inputs of a DataSourceTask 
> deterministic. The propose is save all splits into ExecutionVertex and 
> DataSourceTask will pull split from there.
>  document:
> [https://docs.google.com/document/d/1FdZdcA63tPUEewcCimTFy9Iz2jlVlMRANZkO4RngIuk/edit?usp=sharing]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to