[ 
https://issues.apache.org/jira/browse/TEZ-714?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14484616#comment-14484616
 ] 

Bikas Saha commented on TEZ-714:
--------------------------------

Since we are waiting for all pending commits to complete or cancel, then why do 
we need to ignore these in killed/failed state. They should not come because 
they will be handled in terminating state. right?
{code}          // Ignore-able events
          .addTransition(VertexState.FAILED, VertexState.FAILED,
              EnumSet.of(VertexEventType.V_TERMINATE,
                  VertexEventType.V_COMMIT_COMPLETED,{code}

Why has committed check been removed? Does not hurt to keep it there. The code 
may have been added after observing a bug that we may not be considering when 
we remove it in this patch.
{code}
          if (vertex.commitVertexOutputs && !vertex.committed.getAndSet(true)) {
{code}

New TODO??
{code}+            // TODO should be RECOVERY_ERROR
+            
vertex.trySetTerminationCause(VertexTerminationCause.INTERNAL_ERROR);
{code}

Should we move abortVertex() into finished() similar to DAGImpl?

Then no need to call abortVertex() here. Also, should this say - return 
finished(VertexState.FAILED); ? 
{code}
+            
vertex.trySetTerminationCause(VertexTerminationCause.INTERNAL_ERROR);
+            vertex.abortVertex(VertexStatus.State.FAILED);
+            return VertexState.FAILED;{code}

Can we put a precondition that checks that the current vertex state is 
committing? And similarly in the else stmt that current vertex state is 
terminating.
{code}    if (vertex.terminationCause == null) {
      if (vertex.commitFutures.isEmpty()) {
        // move from COMMITTING to SUCCEEDED
        return vertex.finished(VertexState.SUCCEEDED);{code}

TaskCompletedAfterVertexSuccessTransition has an unnecessary diff?
{code}
      vertex.finished(finalState, VertexTerminationCause.OWN_TASK_FAILURE, 
diagnosticMsg);
      return finalState;
    }{code}

Precondition that something was removed?
{code}  private void commitCompleted(VertexEventCommitCompleted 
commitCompletedEvent) {
    commitFutures.remove(commitCompletedEvent.getOutputName());
    if (commitCompletedEvent.isSucceeded()) {
{code}

Does not need outputname.
{code}  private void commitOutput(String outputName, OutputCommitter 
outputCommitter) throws Exception {
    final OutputCommitter committer = outputCommitter;
    getDagUGI().doAs(new PrivilegedExceptionAction<Void>() {
{code}

these comments probably should go to the head of the function
{code}    // we come here for successful dag completion and when outputs need 
to be
    // committed at the end for all or none visibility
    for (final Vertex vertex : vertices.values()) {{code}

Misleading log message. should say writing history or something like that.
{code}      try {
        LOG.info("Calling DAG commit for dag: " + getID());{code}

Good to have some precondition checks for current state like in verteximpl 
comments earlier.
{code}    if (dag.terminationCause == null) {
      if (!dag.commitFutures.isEmpty()) {
        // pending commits are running
        return DAGState.COMMITTING;
      } else {
        return dag.finished(DAGState.SUCCEEDED);
      }
    } else {
      if (!dag.commitFutures.isEmpty()) {
        // pending commits are running
        return DAGState.TERMINATING;
      } else {
        return finishWithTerminationCause(dag);{code}

New TODO?
{code}
    // TODO need to clean up code, consolidate the common code for vertex 
failed/killed/error
{code}

Cancel commits for failed but not for killed?
{code}      else if (vertexEvent.getVertexState() == VertexState.FAILED) {
        job.enactKill(
            DAGTerminationCause.VERTEX_FAILURE, 
VertexTerminationCause.OTHER_VERTEX_FAILURE);
        job.cancelCommits();
        job.vertexFailed(vertex);
        forceTransitionToKillWait = true;
      }
      else if (vertexEvent.getVertexState() == VertexState.KILLED) {
        job.vertexKilled(vertex);
        forceTransitionToKillWait = true;
      }{code}

rename commitCompleted() ?
{code}
  private boolean commitComplete(DAGEventCommitCompleted commitCompletedEvent) 
{{code}

Please open a jira and refer to it instead of leaving a TODO 
{code}  // TODO go to TERMINATING to wait for all vertices and commits 
completed{code}

Can just be called getException(). It is obvious that something has failed 
right?
{code}
  public Throwable getFailException() {
    return failException;{code}


Empty?
{code}
  @Test(timeout = 5000)
  public void testVertexRerunningWhileCommitting() {
    
  }
{code}

Shouldnt this be (true, true)? or change the comment
{code}    // both committers fail
    DAG dag4 = createDAG("testDAGBothCommitsFail", false, true);{code}

testCommitOutputOnDAGSuccess() The assert conditions are same for all 3 cases. 
Not sure how we are differentiating between them?

Should V3 be in killed instead of failed? This is case 1 in 
testCommitOutputOnVertexSuccess()
{code}    // v3 either succeeded (commit completed before uv12 commit fails)
    // or failed ( uv12 commit fail when v3 is in running/committing)
    if (v3State.equals(VertexStatus.State.SUCCEEDED)) {
      LOG.info("v3 is succeeded");
    } else {
      Assert.assertEquals(VertexStatus.State.FAILED, v3State);
    }{code}

Should this be (true, true)
{code}    // both committers fail
    DAG dag4 = createDAG("testDAGBothCommitsFail", false, true);
    dagClient = tezClient.submitDAG(dag4);
{code}

Again, many of the assert look the same and are not differentiating between the 
cases.

Typo XXX_OnVertexFinsih

Please use wait/notify instead of while loop
{code}    @Override
    public void commitOutput() throws IOException {
      ++commitCounter;
      while (blockCommit) {
        try {
          Thread.sleep(100);
          LOG.info("committing output:" + getContext().getOutputName());
        } catch (InterruptedException e) {
          throw new IOException(e);
        }
      }
      if (throwError) {
        throw new RuntimeException("I can throwz exceptions in commit");
      }
    }

    public void unblockCommit() {
      blockCommit = false;
    }
{code}

How is the test testing one success causes other to abort, if both committers 
are setup to fail?
{code}  // the first commit fail which cause the second commit abort
  @Test(timeout = 5000)
  public void testVertexCommitFail1_OnVertexSuccess() throws Exception {
    conf.setBoolean(TezConfiguration.TEZ_AM_COMMIT_ALL_OUTPUTS_ON_DAG_SUCCESS,
        false);
    setupDAG(createDAGPlan_SingleVertexWith2Committer(false, false));  <<<<<<<< 
both fail
{code}

How is ordering guaranteed here? Unless the test is using a single threaded 
shared threadpool.
{code}  // the first commit succeed while the second fails
  @Test(timeout = 5000)
  public void testVertexCommitFail2_OnVertexSuccess() throws Exception {
{code}

Why is V3 failed. It should be killed because other vertex commit error should 
fail the DAG and kill other vertices
{code}    // v3 is failed due to the commit failure of the vertex group (v1,v2)
    Assert.assertEquals(VertexState.FAILED, v3.getState());
    Assert.assertEquals(VertexTerminationCause.COMMIT_FAILURE,{code}

Good set of test case combinations overall. 
We are missing the case where the commit is cancelled and it gets cancelled on 
the threadpool and invokes onFailure(). This may be possible to test if we set 
the threadpool to be a fixed size of 1.

This is not a valid assumption. Lets fix the other comments and come back to 
this later.
{code}
          // abort operation should take no side effort on the successful commit
          Assert.assertEquals(1, committer.abortCounter);
{code}


> OutputCommitters should not run in the main AM dispatcher thread
> ----------------------------------------------------------------
>
>                 Key: TEZ-714
>                 URL: https://issues.apache.org/jira/browse/TEZ-714
>             Project: Apache Tez
>          Issue Type: Improvement
>            Reporter: Siddharth Seth
>            Assignee: Jeff Zhang
>            Priority: Critical
>         Attachments: DAG_2.pdf, TEZ-714-1.patch, TEZ-714-2.patch, 
> TEZ-714-3.patch, TEZ-714-4.patch, TEZ-714-5.patch, TEZ-714-6.patch, 
> TEZ-714-7.patch, TEZ-714-8.patch, Vertex_2.pdf
>
>
> Follow up jira from TEZ-41.
> 1) If there's multiple OutputCommitters on a Vertex, they can be run in 
> parallel.
> 2) Running an OutputCommitter in the main thread blocks all other event 
> handling, w.r.t the DAG, and causes the event queue to back up.
> 3) This should also cover shared commits that happen in the DAG.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to