[
https://issues.apache.org/jira/browse/TEZ-714?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14381238#comment-14381238
]
Bikas Saha commented on TEZ-714:
--------------------------------
typo
{code}
+ private AtomicBoolean commitCancled = new AtomicBoolean(false);
boolean commitAllOutputsOnSuccess = true;
{code}
Most of these should not be ignored because there is a bug is any of these
events actually come in during commit. Maybe except vertex manager user code
error, which can be ignored.
{code} .addTransition(VertexState.COMMITTING, VertexState.COMMITTING,
EnumSet.of(
VertexEventType.V_MANAGER_USER_CODE_ERROR,
VertexEventType.V_ROOT_INPUT_FAILED,
VertexEventType.V_SOURCE_VERTEX_STARTED,
VertexEventType.V_ROOT_INPUT_INITIALIZED,
VertexEventType.V_NULL_EDGE_INITIALIZED,
VertexEventType.V_SOURCE_TASK_ATTEMPT_COMPLETED,
VertexEventType.V_TASK_ATTEMPT_COMPLETED)){code}
Why is this now public?
{code} public void abortVertex(final VertexStatus.State finalState) {
{code}
Where is abort being called on all outputs when the vertex/dag fails (failure
could be in commit operation or due to external cause). Should we wait for all
outstanding commit operations to get cancelled or complete and then call abort
on all outputs?
Why is this calling Vertex.abortVertex() instead of directly calling
committer.abort() for the outputs?
{code} if (commitAllOutputsOnSuccess) {
for (Vertex vertex : vertices.values()) {
((VertexImpl)vertex).abortVertex(VertexStatus.State.FAILED);
}{code}
Why has calling commit operations moved from DAG.finished() to
DAG.checkForCompletion()? finished() is expected to be called once but
checkForCompletion can be called any number of times. finished() may need to be
broken into 2 methods though to separate the parts which should happen after
commits are done.
In OutputKey there vertexName and groupVertexName can be merged so make the
code paths similar. Where needed indicating group can be done via a boolean.
{code} for (Map.Entry<OutputKey, ListenableFuture<Void>> entry :
commitFutures.entrySet()) {
OutputKey outputKey = entry.getKey();
if (outputKey.vertexGroupName != null) {
LOG.info("Canceling commit of output:" + outputKey.getOutputName()
+ " of vertex group:" + outputKey.vertexGroupName);
} else {
LOG.info("Canceling commit of output:" + outputKey.getOutputName()
+ " of vertex:" + outputKey.vertexName);
}{code}
should this be private if its accessed by derived classes? Is
CommitCompletedTransition used in the state machine? If not, then it does not
need to be a transition.class.
{code} // either commitFail or recoveryFail
private boolean isFail = false;{code}
Why is this directly sending events instead of using a common method?
{code} if (super.isFail) {
for (Vertex vertex : dag.vertices.values()) {
((VertexImpl)vertex).handle(new
VertexEventTermination(vertex.getVertexId(),
VertexTerminationCause.OTHER_VERTEX_FAILURE));
}
return DAGState.TERMINATING;
{code}
Why is there no check for whether there are non-zero committers?
{code} private synchronized DAGState commitOrFinish() {
if (this.committed) {
LOG.info("Ignoring multiple output commit/abort");
if (commitFutures.isEmpty() && terminationCause == null) {
return finished(DAGState.SUCCEEDED);
} else {
return getState();
}
}
LOG.info("Calling DAG commit for dag: " + getID());
this.committed = true;
// commit all shared outputs
try {
appContext.getHistoryHandler().handleCriticalEvent(new
DAGHistoryEvent(getID(),
new DAGCommitStartedEvent(getID(), clock.getTime())));
} catch (IOException e) {
LOG.error("Failed to send commit event to history/recovery handler", e);
trySetTerminationCause(DAGTerminationCause.RECOVERY_FAILURE);
return DAGState.FAILED;
{code}
Thanks for incorporating the suggestions about the flow. The new code is much
simpler, though there may be some issues that may need ironing out if the above
comments are valid.
Haven't seen the tests yet.
> OutputCommitters should not run in the main AM dispatcher thread
> ----------------------------------------------------------------
>
> Key: TEZ-714
> URL: https://issues.apache.org/jira/browse/TEZ-714
> Project: Apache Tez
> Issue Type: Improvement
> Reporter: Siddharth Seth
> Assignee: Jeff Zhang
> Priority: Critical
> Attachments: DAG_2.pdf, TEZ-714-1.patch, TEZ-714-2.patch,
> TEZ-714-3.patch, TEZ-714-4.patch, Vertex_2.pdf
>
>
> Follow up jira from TEZ-41.
> 1) If there's multiple OutputCommitters on a Vertex, they can be run in
> parallel.
> 2) Running an OutputCommitter in the main thread blocks all other event
> handling, w.r.t the DAG, and causes the event queue to back up.
> 3) This should also cover shared commits that happen in the DAG.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)