[ 
https://issues.apache.org/jira/browse/TEZ-2581?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14993545#comment-14993545
 ] 

Jeff Zhang edited comment on TEZ-2581 at 11/6/15 11:43 AM:
-----------------------------------------------------------

bq. Before we go into that, can you please confirm that a Vertex will be re-run 
from scratch with original initializer and vertex manager if the 
reconfigureDoneEvent is not recovered for that vertex right? The 
VertexInitGeneratedEvent is not relevant because a VM could reconfigureDone 
with 0 inputs in the vertex.
Yes, I can confirm. VertexRecoveryData#shouldSkipInit control that. It requires 
both VertexInitedEvent and VertexReconfigureDoneEvent is not null.

bq. From what I see in the code and from the name vertexReconfigurePlanned - I 
think the understanding/usage is not accurate. E.g. even if 
vertexReconfigurePlanned() API is not invoked we can still be in case 1 (-1 -> 
numTasks). In fact this is the common case. So we should ignore those VM 
semantics completely and try to set a flag that shows whether eventually 
setParallelism (or reconfigureVertex) was called to change the vertex state.
Not sure what you mean. vertexReconfigurePlanned is only called for case 
2(numTask1->numTask2). In case 1,  vertexReconfigurePlanned would not be 
called. Currently I create a new flag in VertexReconfigureDoneEvent to 
differentiate these 2 cases.  And make most of changes in NoOpVertexManager to 
make less affect on the existing work flow. Here's the code
{code}
  @Override
    public void initialize() throws Exception {
      LOG.info("initialize NoOpVertexManager");
      reconfigureDoneEvent = new VertexConfigurationDoneEvent();
      reconfigureDoneEvent.fromProtoStream(new 
ByteArrayInputStream(getContext().getUserPayload().deepCopyAsArray()));
      if (reconfigureDoneEvent.isReconfigurePlanned()) {
        getContext().vertexReconfigurationPlanned();
      } else {
        if (reconfigureDoneEvent.isSetParallelismCalled()) {
          
getContext().reconfigureVertex(reconfigureDoneEvent.getRootInputSpecUpdates(),
              reconfigureDoneEvent.getVertexLocationHint(),
              reconfigureDoneEvent.getNumTasks());
        }
      }
    }

    @Override
    public void onVertexStarted(List<TaskAttemptIdentifier> completions)
        throws Exception {
      // apply the ReconfigureDoneEvent and then schedule all the tasks.
      LOG.debug("onVertexStarted is invoked in NoOpVertexManager, vertex=" + 
getContext().getVertexName());
      if (reconfigureDoneEvent.isReconfigurePlanned()) {
        if (reconfigureDoneEvent.isSetParallelismCalled()) {
          Map<String, EdgeManagerPluginDescriptor> sourceEdgeProperties = new 
HashMap<String, EdgeManagerPluginDescriptor>();
          for (Map.Entry<String, EdgeProperty> entry : 
reconfigureDoneEvent.getSourceEdgeProperties().entrySet()) {
            sourceEdgeProperties.put(entry.getKey(), 
entry.getValue().getEdgeManagerDescriptor());
          }
          getContext().reconfigureVertex(reconfigureDoneEvent.getNumTasks(),
                reconfigureDoneEvent.getVertexLocationHint(),
                sourceEdgeProperties,
                reconfigureDoneEvent.getRootInputSpecUpdates());
        }
        getContext().doneReconfiguringVertex();
      }
      // schedule tasks
{code}



was (Author: zjffdu):
bq. Before we go into that, can you please confirm that a Vertex will be re-run 
from scratch with original initializer and vertex manager if the 
reconfigureDoneEvent is not recovered for that vertex right? The 
VertexInitGeneratedEvent is not relevant because a VM could reconfigureDone 
with 0 inputs in the vertex.
Yes, I can confirm. VertexRecoveryData#shouldSkipInit control that. It requires 
both VertexInitedEvent and VertexReconfigureDoneEvent is not null.

bq. From what I see in the code and from the name vertexReconfigurePlanned - I 
think the understanding/usage is not accurate. E.g. even if 
vertexReconfigurePlanned() API is not invoked we can still be in case 1 (-1 -> 
numTasks). In fact this is the common case. So we should ignore those VM 
semantics completely and try to set a flag that shows whether eventually 
setParallelism (or reconfigureVertex) was called to change the vertex state.
Not sure what you mean. vertexReconfigurePlanned is only called for case 
2(numTask1->numTask2). In case 1,  vertexReconfigurePlanned would not be 
called. Currently I create a new flag in VertexReconfigureDoneEvent to 
differentiate these 2 cases.  And make most of them changes in 
NoOpVertexManager to make less affect on the existing work flow. Here's the code
{code}
  @Override
    public void initialize() throws Exception {
      LOG.info("initialize NoOpVertexManager");
      reconfigureDoneEvent = new VertexConfigurationDoneEvent();
      reconfigureDoneEvent.fromProtoStream(new 
ByteArrayInputStream(getContext().getUserPayload().deepCopyAsArray()));
      if (reconfigureDoneEvent.isReconfigurePlanned()) {
        getContext().vertexReconfigurationPlanned();
      } else {
        if (reconfigureDoneEvent.isSetParallelismCalled()) {
          
getContext().reconfigureVertex(reconfigureDoneEvent.getRootInputSpecUpdates(),
              reconfigureDoneEvent.getVertexLocationHint(),
              reconfigureDoneEvent.getNumTasks());
        }
      }
    }

    @Override
    public void onVertexStarted(List<TaskAttemptIdentifier> completions)
        throws Exception {
      // apply the ReconfigureDoneEvent and then schedule all the tasks.
      LOG.debug("onVertexStarted is invoked in NoOpVertexManager, vertex=" + 
getContext().getVertexName());
      if (reconfigureDoneEvent.isReconfigurePlanned()) {
        if (reconfigureDoneEvent.isSetParallelismCalled()) {
          Map<String, EdgeManagerPluginDescriptor> sourceEdgeProperties = new 
HashMap<String, EdgeManagerPluginDescriptor>();
          for (Map.Entry<String, EdgeProperty> entry : 
reconfigureDoneEvent.getSourceEdgeProperties().entrySet()) {
            sourceEdgeProperties.put(entry.getKey(), 
entry.getValue().getEdgeManagerDescriptor());
          }
          getContext().reconfigureVertex(reconfigureDoneEvent.getNumTasks(),
                reconfigureDoneEvent.getVertexLocationHint(),
                sourceEdgeProperties,
                reconfigureDoneEvent.getRootInputSpecUpdates());
        }
        getContext().doneReconfiguringVertex();
      }
      // schedule tasks
{code}


> Umbrella for Tez Recovery Redesign
> ----------------------------------
>
>                 Key: TEZ-2581
>                 URL: https://issues.apache.org/jira/browse/TEZ-2581
>             Project: Apache Tez
>          Issue Type: Improvement
>            Reporter: Jeff Zhang
>            Assignee: Jeff Zhang
>         Attachments: TEZ-2581-WIP-1.patch, TEZ-2581-WIP-10.patch, 
> TEZ-2581-WIP-2.patch, TEZ-2581-WIP-3.patch, TEZ-2581-WIP-4.patch, 
> TEZ-2581-WIP-5.patch, TEZ-2581-WIP-6.patch, TEZ-2581-WIP-7.patch, 
> TEZ-2581-WIP-8.patch, TEZ-2581-WIP-9.patch, TezRecoveryRedesignProposal.pdf, 
> TezRecoveryRedesignV1.1.pdf
>
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to