[ 
https://issues.apache.org/jira/browse/FLINK-12131?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhu Zhu updated FLINK-12131:
----------------------------
    Description: 
Two status may not be correct with region failover and current reset logic.
 # *numberOfRunningProducers* in *IntermediateResult*.
 # *hasDataProduced* in *IntermediateResultPartition*.

This is because currently only when the *ExecutionJobVertex* is reset will the 
related *IntermediateResult*(and the inner *IntermediateResultPartition*) get 
reset. But region failover resets the affected *ExecutionVertex(ex)*,  rather 
than the entire *ExecutionJobVertex*.

Problems below may occur as a result:
 # when a FINISHED vertex is restarted and finishes again, the 
*IntermediateResult.numberOfRunningProducers* may drop below 0 and throws 
exception to trigger global failover
 # the *IntermediateResult.numberOfRunningProducers* can be smaller than fact, 
letting the downstream vertices scheduled earlier than expected
 # the *IntermediateResultPartition* is reset and not started yet but the 
*hasDataProduced* remains true

That's why I'd propose we add IntermediateResult status adjust logic to 
*ExecutionVertex.**resetForNewExecution()***.

Detailed design: 
[https://docs.google.com/document/d/1YA3k8rwDEv1UdaV9NwoDmwc-XorG__JUXlpyJtDs4Ss/edit?usp=sharing]
 

  was:
Currently the *IntermediateResult* status is only reset when its producer 
*ExecutionJobVertex* is reset.

When region failover strategy is enabled, the failed region vertices are reset 
through  *ExecutionVertex.resetForNewExecution()*. The 
*numberOfRunningProducers* counter in

IntermediateResult, however, is not properly adjusted in this case.

So if a FINISHED vertex is restarted and finishes again, the counter may drop 
below 0.

Besides, the consumable property of the partition is not reset as well. This 
may lead to incorrect input state check result for lazy scheduling.

 

I'd propose to invoke *IntermediateResultPartition.resetForNewExecution()* in 
*ExecutionVertex.resetForNewExecution()* and reset the 
*numberOfRunningProducers* counter and *IntermediateResultPartition* there.

 


> Resetting ExecutionVertex in region failover may cause inconsistency of 
> IntermediateResult status
> -------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-12131
>                 URL: https://issues.apache.org/jira/browse/FLINK-12131
>             Project: Flink
>          Issue Type: Bug
>          Components: Runtime / Coordination
>    Affects Versions: 1.9.0
>            Reporter: Zhu Zhu
>            Assignee: Zhu Zhu
>            Priority: Major
>
> Two status may not be correct with region failover and current reset logic.
>  # *numberOfRunningProducers* in *IntermediateResult*.
>  # *hasDataProduced* in *IntermediateResultPartition*.
> This is because currently only when the *ExecutionJobVertex* is reset will 
> the related *IntermediateResult*(and the inner *IntermediateResultPartition*) 
> get reset. But region failover resets the affected *ExecutionVertex(ex)*,  
> rather than the entire *ExecutionJobVertex*.
> Problems below may occur as a result:
>  # when a FINISHED vertex is restarted and finishes again, the 
> *IntermediateResult.numberOfRunningProducers* may drop below 0 and throws 
> exception to trigger global failover
>  # the *IntermediateResult.numberOfRunningProducers* can be smaller than 
> fact, letting the downstream vertices scheduled earlier than expected
>  # the *IntermediateResultPartition* is reset and not started yet but the 
> *hasDataProduced* remains true
> That's why I'd propose we add IntermediateResult status adjust logic to 
> *ExecutionVertex.**resetForNewExecution()***.
> Detailed design: 
> [https://docs.google.com/document/d/1YA3k8rwDEv1UdaV9NwoDmwc-XorG__JUXlpyJtDs4Ss/edit?usp=sharing]
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to