[
https://issues.apache.org/jira/browse/FLINK-12131?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Zhu Zhu updated FLINK-12131:
----------------------------
Description:
Currently the *IntermediateResult* status is only reset when its producer
*ExecutionJobVertex* is reset.
When region failover strategy is enabled, the failed region vertices are reset
through *ExecutionVertex.resetForNewExecution()*. The
*numberOfRunningProducers* counter in
IntermediateResult, however, is not properly adjusted in this case.
So if a FINISHED vertex is restarted and finishes again, the counter may drop
below 0.
Besides, the consumable property of the partition is not reset as well. This
may lead to incorrect input state check result for lazy scheduling.
I'd propose to invoke *IntermediateResultPartition.resetForNewExecution()* in
*ExecutionVertex.resetForNewExecution()* and reset the
*numberOfRunningProducers* counter and *IntermediateResultPartition* there.
was:
Currently the *IntermediateResult* status is only reset when its producer
*ExecutionJobVertex* is reset.
When region failover strategy is enabled, the failed region vertices are reset
through *ExecutionVertex.resetForNewExecution()*. The
*numberOfRunningProducers* counter in
IntermediateResult, however, is not properly adjusted in this case.
So if a FINISHED vertex is restarted and finishes again, the counter may drop
below 0.
Besides, the consumable property of the partition is not reset as well. This
may lead to incorrect input state check result for lazy scheduling.
I'd propose to invoke *IntermediateResultPartition.resetForNewExecution()* in
*ExecutionVertex.resetForNewExecution()* and reset the
*numberOfRunningProducers* counter and *IntermediateResultPartition* there.
> Resetting ExecutionVertex in region failover may cause inconsistency of
> IntermediateResult status
> -------------------------------------------------------------------------------------------------
>
> Key: FLINK-12131
> URL: https://issues.apache.org/jira/browse/FLINK-12131
> Project: Flink
> Issue Type: Bug
> Components: Runtime / Coordination
> Affects Versions: 1.9.0
> Reporter: Zhu Zhu
> Assignee: Zhu Zhu
> Priority: Major
>
> Currently the *IntermediateResult* status is only reset when its producer
> *ExecutionJobVertex* is reset.
> When region failover strategy is enabled, the failed region vertices are
> reset through *ExecutionVertex.resetForNewExecution()*. The
> *numberOfRunningProducers* counter in
> IntermediateResult, however, is not properly adjusted in this case.
> So if a FINISHED vertex is restarted and finishes again, the counter may drop
> below 0.
> Besides, the consumable property of the partition is not reset as well. This
> may lead to incorrect input state check result for lazy scheduling.
>
> I'd propose to invoke *IntermediateResultPartition.resetForNewExecution()* in
> *ExecutionVertex.resetForNewExecution()* and reset the
> *numberOfRunningProducers* counter and *IntermediateResultPartition* there.
>
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)