[
https://issues.apache.org/jira/browse/TEZ-2581?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14993545#comment-14993545
]
Jeff Zhang edited comment on TEZ-2581 at 11/6/15 11:43 AM:
-----------------------------------------------------------
bq. Before we go into that, can you please confirm that a Vertex will be re-run
from scratch with original initializer and vertex manager if the
reconfigureDoneEvent is not recovered for that vertex right? The
VertexInitGeneratedEvent is not relevant because a VM could reconfigureDone
with 0 inputs in the vertex.
Yes, I can confirm. VertexRecoveryData#shouldSkipInit control that. It requires
both VertexInitedEvent and VertexReconfigureDoneEvent is not null.
bq. From what I see in the code and from the name vertexReconfigurePlanned - I
think the understanding/usage is not accurate. E.g. even if
vertexReconfigurePlanned() API is not invoked we can still be in case 1 (-1 ->
numTasks). In fact this is the common case. So we should ignore those VM
semantics completely and try to set a flag that shows whether eventually
setParallelism (or reconfigureVertex) was called to change the vertex state.
Not sure what you mean. vertexReconfigurePlanned is only called for case
2(numTask1->numTask2). In case 1, vertexReconfigurePlanned would not be
called. Currently I create a new flag in VertexReconfigureDoneEvent to
differentiate these 2 cases. And make most of changes in NoOpVertexManager to
make less affect on the existing work flow. Here's the code
{code}
@Override
public void initialize() throws Exception {
LOG.info("initialize NoOpVertexManager");
reconfigureDoneEvent = new VertexConfigurationDoneEvent();
reconfigureDoneEvent.fromProtoStream(new
ByteArrayInputStream(getContext().getUserPayload().deepCopyAsArray()));
if (reconfigureDoneEvent.isReconfigurePlanned()) {
getContext().vertexReconfigurationPlanned();
} else {
if (reconfigureDoneEvent.isSetParallelismCalled()) {
getContext().reconfigureVertex(reconfigureDoneEvent.getRootInputSpecUpdates(),
reconfigureDoneEvent.getVertexLocationHint(),
reconfigureDoneEvent.getNumTasks());
}
}
}
@Override
public void onVertexStarted(List<TaskAttemptIdentifier> completions)
throws Exception {
// apply the ReconfigureDoneEvent and then schedule all the tasks.
LOG.debug("onVertexStarted is invoked in NoOpVertexManager, vertex=" +
getContext().getVertexName());
if (reconfigureDoneEvent.isReconfigurePlanned()) {
if (reconfigureDoneEvent.isSetParallelismCalled()) {
Map<String, EdgeManagerPluginDescriptor> sourceEdgeProperties = new
HashMap<String, EdgeManagerPluginDescriptor>();
for (Map.Entry<String, EdgeProperty> entry :
reconfigureDoneEvent.getSourceEdgeProperties().entrySet()) {
sourceEdgeProperties.put(entry.getKey(),
entry.getValue().getEdgeManagerDescriptor());
}
getContext().reconfigureVertex(reconfigureDoneEvent.getNumTasks(),
reconfigureDoneEvent.getVertexLocationHint(),
sourceEdgeProperties,
reconfigureDoneEvent.getRootInputSpecUpdates());
}
getContext().doneReconfiguringVertex();
}
// schedule tasks
{code}
was (Author: zjffdu):
bq. Before we go into that, can you please confirm that a Vertex will be re-run
from scratch with original initializer and vertex manager if the
reconfigureDoneEvent is not recovered for that vertex right? The
VertexInitGeneratedEvent is not relevant because a VM could reconfigureDone
with 0 inputs in the vertex.
Yes, I can confirm. VertexRecoveryData#shouldSkipInit control that. It requires
both VertexInitedEvent and VertexReconfigureDoneEvent is not null.
bq. From what I see in the code and from the name vertexReconfigurePlanned - I
think the understanding/usage is not accurate. E.g. even if
vertexReconfigurePlanned() API is not invoked we can still be in case 1 (-1 ->
numTasks). In fact this is the common case. So we should ignore those VM
semantics completely and try to set a flag that shows whether eventually
setParallelism (or reconfigureVertex) was called to change the vertex state.
Not sure what you mean. vertexReconfigurePlanned is only called for case
2(numTask1->numTask2). In case 1, vertexReconfigurePlanned would not be
called. Currently I create a new flag in VertexReconfigureDoneEvent to
differentiate these 2 cases. And make most of them changes in
NoOpVertexManager to make less affect on the existing work flow. Here's the code
{code}
@Override
public void initialize() throws Exception {
LOG.info("initialize NoOpVertexManager");
reconfigureDoneEvent = new VertexConfigurationDoneEvent();
reconfigureDoneEvent.fromProtoStream(new
ByteArrayInputStream(getContext().getUserPayload().deepCopyAsArray()));
if (reconfigureDoneEvent.isReconfigurePlanned()) {
getContext().vertexReconfigurationPlanned();
} else {
if (reconfigureDoneEvent.isSetParallelismCalled()) {
getContext().reconfigureVertex(reconfigureDoneEvent.getRootInputSpecUpdates(),
reconfigureDoneEvent.getVertexLocationHint(),
reconfigureDoneEvent.getNumTasks());
}
}
}
@Override
public void onVertexStarted(List<TaskAttemptIdentifier> completions)
throws Exception {
// apply the ReconfigureDoneEvent and then schedule all the tasks.
LOG.debug("onVertexStarted is invoked in NoOpVertexManager, vertex=" +
getContext().getVertexName());
if (reconfigureDoneEvent.isReconfigurePlanned()) {
if (reconfigureDoneEvent.isSetParallelismCalled()) {
Map<String, EdgeManagerPluginDescriptor> sourceEdgeProperties = new
HashMap<String, EdgeManagerPluginDescriptor>();
for (Map.Entry<String, EdgeProperty> entry :
reconfigureDoneEvent.getSourceEdgeProperties().entrySet()) {
sourceEdgeProperties.put(entry.getKey(),
entry.getValue().getEdgeManagerDescriptor());
}
getContext().reconfigureVertex(reconfigureDoneEvent.getNumTasks(),
reconfigureDoneEvent.getVertexLocationHint(),
sourceEdgeProperties,
reconfigureDoneEvent.getRootInputSpecUpdates());
}
getContext().doneReconfiguringVertex();
}
// schedule tasks
{code}
> Umbrella for Tez Recovery Redesign
> ----------------------------------
>
> Key: TEZ-2581
> URL: https://issues.apache.org/jira/browse/TEZ-2581
> Project: Apache Tez
> Issue Type: Improvement
> Reporter: Jeff Zhang
> Assignee: Jeff Zhang
> Attachments: TEZ-2581-WIP-1.patch, TEZ-2581-WIP-10.patch,
> TEZ-2581-WIP-2.patch, TEZ-2581-WIP-3.patch, TEZ-2581-WIP-4.patch,
> TEZ-2581-WIP-5.patch, TEZ-2581-WIP-6.patch, TEZ-2581-WIP-7.patch,
> TEZ-2581-WIP-8.patch, TEZ-2581-WIP-9.patch, TezRecoveryRedesignProposal.pdf,
> TezRecoveryRedesignV1.1.pdf
>
>
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)