[
https://issues.apache.org/jira/browse/FLINK-12406?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Ruidong Li updated FLINK-12406:
-------------------------------
Description:
After each job finishes, the new {{BLOCKING_PERSISTENT}} result partitions are
generated, and locations of these result partitions should be report back to
client via {{JobExecutionResult}}, they will be later used for Table
{{cache()}} and {{invalidateCache()}}
Description
- The client uses {{ExecutionEnvironment}} to submit a batch job and wait for
the {{JobResult}} from {{JM}}
- When a job finishes, {{BLOCKING_PERSISTENT}} {{ResultPartition}} locations
will be collected from each {{ExecutionVertex}} in {{ExecutionGraph}}
- On JM side, the {{BLOCKING_PERSISTENT}} {{ResultPartition}} locations flow in
this path ({{ExecutionGraph}} -> {{ArchivedExecutionGraph}} -> {{JobResult}})
- On client side, A {{JobExecutionResult}} will be created with the returned
{{JobResult}}
- The {{ExecutionEnvironment}} sees the {{JobExecutionResult}} containing the
locations, and stores them in itself
Failure Handling
- If any error occurs during collecting locations of {{BLOCKING_PERSISTENT}}
{{ResultPartition}},
we do not terminate the process but leave incomplete locations of some
{{IntermediateDataSet}}, we keep record of these IntermediateDataSetIDs and
report back to client
- So the Client can use these informations and decide what to do, generally
the data can be read if locations are complete, or a delete request will be
proposed(in later PRs) if the locations are incomplete
Brief change log
- Add a new class {{ResultPartitionDescriptor}}, which stores location of a
{{BLOCKING_PERSISTENT}} {{ResultPartition}}, currently we only support
{{ResultPartition}} in {{TaskManager}}.
- Add a new class {{BlockingPersistentResultPartitionMeta}}, which contains
all mappings from {{IntermediateDataSetID}} to its {{BLOCKING_PERSISTENT}}
{{ResultPartition}} locations
- Add a new method {{getBlockingPersistentResultPartitionMeta()}} in
{{AccessExecutionGraph}}, which returns a
{{BlockingPersistentResultPartitionMeta}}
- Add an instance of {{BlockingPersistentResultPartitionMeta}} in
{{JobExecutionResult}}, {{JobResult}}, {{ArchivedExecutionGraph}} and
{{ExecutionEnvironment}}
- When a job finishes, the locations will flow in this path:
{{ExecutionGraph}} -> {{ArchivedExecutionGraph}} -> {{JobResult}} ->
{{JobExecutionResult}} -> {{ExecutionEnvironment}}
was:
After each job finishes, the new {{BLOCKING_PERSISTENT}} result partitions are
generated, and locations of these result partitions should be report back to
client via {{JobExecutionResult}}, they will be later used for Table
{{cache()}} and {{invalidateCache()}}
Brief Changes:
- Add a new class {{IntermediateResultDescriptor}}, which stores location of a
{{BLOCKING_PERSISTENT}} {{ResultPartition}}, currently we only support
{{ResultPartition}} in {{TaskManager}}.
- Add a new method {{getResultPartitionDescriptors()}} in
{{AccessExecutionGraph}}
- Add a new filed in {{JobExecutionResult}}, {{JobResult}},
{{ArchivedExecutionGraph}} and {{ExecutionEnvironment}}, which keeps a mapping
from {{IntermediateDataSetID}} to its {{ResultPartition}} locations
- When a job finishes, the metadata will flow in this path:
{{ExecutionGraph}} -> {{ArchivedExecutionGraph}} -> {{JobResult}} ->
{{JobExecutionResult}} -> {{ExecutionEnvironment}}
> Report BLOCKING_PERSISTENT result partition meta back to client
> ---------------------------------------------------------------
>
> Key: FLINK-12406
> URL: https://issues.apache.org/jira/browse/FLINK-12406
> Project: Flink
> Issue Type: Sub-task
> Components: API / DataSet, Runtime / Coordination
> Reporter: Ruidong Li
> Assignee: Ruidong Li
> Priority: Major
> Labels: pull-request-available
> Time Spent: 10m
> Remaining Estimate: 0h
>
> After each job finishes, the new {{BLOCKING_PERSISTENT}} result partitions
> are generated, and locations of these result partitions should be report back
> to client via {{JobExecutionResult}}, they will be later used for Table
> {{cache()}} and {{invalidateCache()}}
>
> Description
> - The client uses {{ExecutionEnvironment}} to submit a batch job and wait for
> the {{JobResult}} from {{JM}}
> - When a job finishes, {{BLOCKING_PERSISTENT}} {{ResultPartition}} locations
> will be collected from each {{ExecutionVertex}} in {{ExecutionGraph}}
> - On JM side, the {{BLOCKING_PERSISTENT}} {{ResultPartition}} locations flow
> in this path ({{ExecutionGraph}} -> {{ArchivedExecutionGraph}} ->
> {{JobResult}})
> - On client side, A {{JobExecutionResult}} will be created with the returned
> {{JobResult}}
> - The {{ExecutionEnvironment}} sees the {{JobExecutionResult}} containing the
> locations, and stores them in itself
> Failure Handling
> - If any error occurs during collecting locations of {{BLOCKING_PERSISTENT}}
> {{ResultPartition}},
> we do not terminate the process but leave incomplete locations of some
> {{IntermediateDataSet}}, we keep record of these IntermediateDataSetIDs and
> report back to client
> - So the Client can use these informations and decide what to do, generally
> the data can be read if locations are complete, or a delete request will be
> proposed(in later PRs) if the locations are incomplete
> Brief change log
> - Add a new class {{ResultPartitionDescriptor}}, which stores location of a
> {{BLOCKING_PERSISTENT}} {{ResultPartition}}, currently we only support
> {{ResultPartition}} in {{TaskManager}}.
> - Add a new class {{BlockingPersistentResultPartitionMeta}}, which contains
> all mappings from {{IntermediateDataSetID}} to its {{BLOCKING_PERSISTENT}}
> {{ResultPartition}} locations
> - Add a new method {{getBlockingPersistentResultPartitionMeta()}} in
> {{AccessExecutionGraph}}, which returns a
> {{BlockingPersistentResultPartitionMeta}}
> - Add an instance of {{BlockingPersistentResultPartitionMeta}} in
> {{JobExecutionResult}}, {{JobResult}}, {{ArchivedExecutionGraph}} and
> {{ExecutionEnvironment}}
> - When a job finishes, the locations will flow in this path:
> {{ExecutionGraph}} -> {{ArchivedExecutionGraph}} -> {{JobResult}} ->
> {{JobExecutionResult}} -> {{ExecutionEnvironment}}
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)