[ 
https://issues.apache.org/jira/browse/FLINK-5867?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15984131#comment-15984131
 ] 

ASF GitHub Bot commented on FLINK-5867:
---------------------------------------

Github user shuai-xu commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3773#discussion_r113361061
  
    --- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/FailoverRegion.java
 ---
    @@ -0,0 +1,251 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.executiongraph.failover;
    +
    +import org.apache.flink.runtime.concurrent.AcceptFunction;
    +import org.apache.flink.runtime.concurrent.Future;
    +import org.apache.flink.runtime.concurrent.FutureUtils;
    +import org.apache.flink.runtime.executiongraph.Execution;
    +import org.apache.flink.runtime.executiongraph.ExecutionGraph;
    +import org.apache.flink.runtime.executiongraph.ExecutionVertex;
    +import org.apache.flink.runtime.executiongraph.GlobalModVersionMismatch;
    +import org.apache.flink.runtime.jobgraph.JobStatus;
    +import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
    +import org.apache.flink.util.AbstractID;
    +import org.apache.flink.util.FlinkException;
    +
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.util.ArrayList;
    +import java.util.Collection;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.concurrent.Executor;
    +import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
    +
    +import static org.apache.flink.util.Preconditions.checkNotNull;
    +
    +/**
    + * FailoverRegion manages the failover of a minimal pipeline connected sub 
graph.
    + * It will change from CREATED to CANCELING and then to CANCELLED and at 
last to RUNNING,
    + */
    +public class FailoverRegion {
    +
    +   private static final AtomicReferenceFieldUpdater<FailoverRegion, 
JobStatus> STATE_UPDATER =
    +                   
AtomicReferenceFieldUpdater.newUpdater(FailoverRegion.class, JobStatus.class, 
"state");
    +
    +   /** The log object used for debugging. */
    +   private static final Logger LOG = 
LoggerFactory.getLogger(FailoverRegion.class);
    +
    +   // 
------------------------------------------------------------------------
    +
    +   /** a unique id for debugging */
    +   private final AbstractID id = new AbstractID();
    +
    +   private final ExecutionGraph executionGraph;
    +
    +   private final List<ExecutionVertex> connectedExecutionVertexes;
    +
    +   /** The executor that executes the recovery action after all vertices 
are in a */
    +   private final Executor executor;
    +
    +   /** Current status of the job execution */
    +   private volatile JobStatus state = JobStatus.RUNNING;
    +
    +
    +   public FailoverRegion(ExecutionGraph executionGraph, Executor executor, 
List<ExecutionVertex> connectedExecutions) {
    +           this.executionGraph = checkNotNull(executionGraph);
    +           this.executor = checkNotNull(executor);
    +           this.connectedExecutionVertexes = 
checkNotNull(connectedExecutions);
    +
    +           LOG.debug("Created failover region {} with vertices: {}", id, 
connectedExecutions);
    +   }
    +
    +   public void onExecutionFail(Execution taskExecution, Throwable cause) {
    +           // TODO: check if need to failover the preceding region
    +           if (!executionGraph.getRestartStrategy().canRestart()) {
    +                   // delegate the failure to a global fail that will 
check the restart strategy and not restart
    +                   executionGraph.failGlobal(cause);
    +           }
    +           else {
    +                   cancel(taskExecution.getGlobalModVersion());
    +           }
    +   }
    +
    +   private void allVerticesInTerminalState(long 
globalModVersionOfFailover) {
    +           while (true) {
    +                   JobStatus curStatus = this.state;
    +                   if (curStatus.equals(JobStatus.CANCELLING)) {
    +                           if (transitionState(curStatus, 
JobStatus.CANCELED)) {
    +                                   reset(globalModVersionOfFailover);
    +                                   break;
    +                           }
    +                   }
    +                   else {
    +                           LOG.info("FailoverRegion {} is {} when 
allVerticesInTerminalState.", id, state);
    +                           break;
    +                   }
    +           }
    +   }
    +
    +   public JobStatus getState() {
    +           return state;
    +   }
    +
    +   /**
    +    * get all execution vertexes contained in this region
    +    */
    +   public List<ExecutionVertex> getAllExecutionVertexes() {
    +           return connectedExecutionVertexes;
    +   }
    +
    +   // Notice the region to failover, 
    +   private void failover(long globalModVersionOfFailover) {
    +           if (!executionGraph.getRestartStrategy().canRestart()) {
    +                   executionGraph.failGlobal(new 
FlinkException("RestartStrategy validate fail"));
    +           }
    +           else {
    +                   JobStatus curStatus = this.state;
    +                   if (curStatus.equals(JobStatus.RUNNING)) {
    +                           cancel(globalModVersionOfFailover);
    +                   }
    +                   else if (curStatus.equals(JobStatus.CANCELED)) {
    +                           reset(globalModVersionOfFailover);
    +                   }
    +                   else {
    +                           LOG.info("FailoverRegion {} is {} when notified 
to failover.", id, state);
    +                   }
    +           }
    +   }
    +
    +   // cancel all executions in this sub graph
    +   private void cancel(final long globalModVersionOfFailover) {
    +           while (true) {
    +                   JobStatus curStatus = this.state;
    +                   if (curStatus.equals(JobStatus.RUNNING)) {
    +                           if (transitionState(curStatus, 
JobStatus.CANCELLING)) {
    +
    +                                   // we build a future that is complete 
once all vertices have reached a terminal state
    +                                   final ArrayList<Future<?>> futures = 
new ArrayList<>(connectedExecutionVertexes.size());
    +
    +                                   // cancel all tasks (that still need 
cancelling)
    +                                   for (ExecutionVertex vertex : 
connectedExecutionVertexes) {
    +                                           futures.add(vertex.cancel());
    +                                   }
    +
    +                                   final FutureUtils.ConjunctFuture 
allTerminal = FutureUtils.combineAll(futures);
    +                                   allTerminal.thenAcceptAsync(new 
AcceptFunction<Void>() {
    +                                           @Override
    +                                           public void accept(Void value) {
    +                                                   
allVerticesInTerminalState(globalModVersionOfFailover);
    +                                           }
    +                                   }, executor);
    +
    +                                   break;
    +                           }
    +                   }
    +                   else {
    +                           LOG.info("FailoverRegion {} is {} when 
cancel.", id, state);
    +                           break;
    +                   }
    +           }
    +   }
    +
    +   // reset all executions in this sub graph
    +   private void reset(long globalModVersionOfFailover) {
    +           try {
    +                   // reset all connected ExecutionVertexes
    +                   final Collection<CoLocationGroup> colGroups = new 
HashSet<>();
    +                   final long restartTimestamp = 
System.currentTimeMillis();
    +
    +                   for (ExecutionVertex ev : connectedExecutionVertexes) {
    +                           CoLocationGroup cgroup = 
ev.getJobVertex().getCoLocationGroup();
    +                           if (cgroup != null && 
!colGroups.contains(cgroup)){
    +                                   cgroup.resetConstraints();
    +                                   colGroups.add(cgroup);
    +                           }
    +
    +                           ev.resetForNewExecution(restartTimestamp, 
globalModVersionOfFailover);
    +                   }
    +                   if (transitionState(JobStatus.CANCELED, 
JobStatus.CREATED)) {
    +                           restart(globalModVersionOfFailover);
    +                   }
    +                   else {
    +                           LOG.info("FailoverRegion {} switched from 
CANCELLING to CREATED fail, will fail this region again.", id);
    +                           failover(globalModVersionOfFailover);
    +                   }
    +           }
    +           catch (GlobalModVersionMismatch e) {
    +                   // happens when a global recovery happens concurrently 
to the regional recovery
    +                   // go back to a clean state
    +                   state = JobStatus.CREATED;
    --- End diff --
    
    The initial state of FailoverRegion should be RUNNING, as when 
FailoverRegion is working, the exeuctions in it will always be scheduled 
already. The CREATED state is only used during the failover process.


> The implementation of RestartPipelinedRegionStrategy
> ----------------------------------------------------
>
>                 Key: FLINK-5867
>                 URL: https://issues.apache.org/jira/browse/FLINK-5867
>             Project: Flink
>          Issue Type: Sub-task
>          Components: JobManager
>            Reporter: shuai.xu
>            Assignee: shuai.xu
>
> The RestartPipelinedRegionStrategy's responsibility is the following:
> 1. Calculate all FailoverRegions and their relations when initializing.
> 2. Listen for the failure of the job and executions, and find corresponding 
> FailoverRegions to do the failover.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to