[ 
https://issues.apache.org/jira/browse/FLINK-2790?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14946825#comment-14946825
 ] 

ASF GitHub Bot commented on FLINK-2790:
---------------------------------------

Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1213#discussion_r41387623
  
    --- Diff: 
flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerLike.scala
 ---
    @@ -0,0 +1,367 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.testingUtils
    +
    +import akka.actor.{Terminated, Cancellable, ActorRef}
    +import akka.pattern.{ask, pipe}
    +import org.apache.flink.api.common.JobID
    +import org.apache.flink.runtime.FlinkActor
    +import org.apache.flink.runtime.execution.ExecutionState
    +import org.apache.flink.runtime.jobgraph.JobStatus
    +import org.apache.flink.runtime.jobmanager.JobManager
    +import 
org.apache.flink.runtime.messages.ExecutionGraphMessages.JobStatusChanged
    +import org.apache.flink.runtime.messages.JobManagerMessages.GrantLeadership
    +import org.apache.flink.runtime.messages.Messages.{Acknowledge, Disconnect}
    +import 
org.apache.flink.runtime.messages.RegistrationMessages.RegisterTaskManager
    +import org.apache.flink.runtime.messages.TaskManagerMessages.Heartbeat
    +import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages._
    +import 
org.apache.flink.runtime.testingUtils.TestingMessages.{DisableDisconnect,
    +CheckIfJobRemoved, Alive}
    +import 
org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages.AccumulatorsChanged
    +
    +import scala.collection.mutable
    +import scala.concurrent.Future
    +import scala.concurrent.duration._
    +
    +import language.postfixOps
    +
    +/** This mixin can be used to decorate a JobManager with messages for 
testing purpose.
    +  *
    +  */
    +trait TestingJobManagerLike extends FlinkActor {
    +  that: JobManager =>
    +
    +  import scala.collection.JavaConverters._
    +  import context._
    +
    +  val waitForAllVerticesToBeRunning = 
scala.collection.mutable.HashMap[JobID, Set[ActorRef]]()
    +  val waitForTaskManagerToBeTerminated = 
scala.collection.mutable.HashMap[String, Set[ActorRef]]()
    +
    +  val waitForAllVerticesToBeRunningOrFinished =
    +    scala.collection.mutable.HashMap[JobID, Set[ActorRef]]()
    +
    +  var periodicCheck: Option[Cancellable] = None
    +
    +  val waitForJobStatus = scala.collection.mutable.HashMap[JobID,
    +    collection.mutable.HashMap[JobStatus, Set[ActorRef]]]()
    +
    +  val waitForAccumulatorUpdate = scala.collection.mutable.HashMap[JobID, 
(Boolean, Set[ActorRef])]()
    +
    +  val waitForLeader = scala.collection.mutable.HashSet[ActorRef]()
    +
    +  val waitForNumRegisteredTaskManagers = mutable.PriorityQueue.newBuilder(
    +    new Ordering[(Int, ActorRef)] {
    +      override def compare(x: (Int, ActorRef), y: (Int, ActorRef)): Int = 
y._1 - x._1
    +    })
    +
    +  var disconnectDisabled = false
    +
    +  abstract override def handleMessage: Receive = {
    +    handleTestingMessage orElse super.handleMessage
    +  }
    +
    +  def handleTestingMessage: Receive = {
    +    case Alive => sender() ! Acknowledge
    +
    +    case RequestExecutionGraph(jobID) =>
    +      currentJobs.get(jobID) match {
    +        case Some((executionGraph, jobInfo)) => sender() ! decorateMessage(
    +          ExecutionGraphFound(
    +            jobID,
    +            executionGraph)
    +        )
    +
    +        case None => 
archive.tell(decorateMessage(RequestExecutionGraph(jobID)), sender())
    +      }
    +
    +    case WaitForAllVerticesToBeRunning(jobID) =>
    +      if(checkIfAllVerticesRunning(jobID)){
    +        sender() ! decorateMessage(AllVerticesRunning(jobID))
    +      }else{
    +        val waiting = waitForAllVerticesToBeRunning.getOrElse(jobID, 
Set[ActorRef]())
    +        waitForAllVerticesToBeRunning += jobID -> (waiting + sender())
    +
    +        if(periodicCheck.isEmpty){
    +          periodicCheck =
    +            Some(
    +              context.system.scheduler.schedule(
    +                0 seconds,
    +                200 millis,
    +                self,
    +                decorateMessage(NotifyListeners)
    +              )
    +            )
    +        }
    +      }
    +    case WaitForAllVerticesToBeRunningOrFinished(jobID) =>
    +      if(checkIfAllVerticesRunningOrFinished(jobID)){
    +        sender() ! decorateMessage(AllVerticesRunning(jobID))
    +      }else{
    +        val waiting = 
waitForAllVerticesToBeRunningOrFinished.getOrElse(jobID, Set[ActorRef]())
    +        waitForAllVerticesToBeRunningOrFinished += jobID -> (waiting + 
sender())
    +
    +        if(periodicCheck.isEmpty){
    +          periodicCheck =
    +            Some(
    +              context.system.scheduler.schedule(
    +                0 seconds,
    +                200 millis,
    +                self,
    +                decorateMessage(NotifyListeners)
    +              )
    +            )
    +        }
    +      }
    +
    +    case NotifyListeners =>
    +      for(jobID <- currentJobs.keySet){
    +        notifyListeners(jobID)
    +      }
    +
    +      if(waitForAllVerticesToBeRunning.isEmpty && 
waitForAllVerticesToBeRunningOrFinished.isEmpty) {
    +        periodicCheck foreach { _.cancel() }
    +        periodicCheck = None
    +      }
    +
    +
    +    case NotifyWhenJobRemoved(jobID) =>
    +      val gateways = 
instanceManager.getAllRegisteredInstances.asScala.map(_.getActorGateway)
    +
    +      val responses = gateways.map{
    +        gateway => gateway.ask(NotifyWhenJobRemoved(jobID), 
timeout).mapTo[Boolean]
    +      }
    +
    +      val jobRemovedOnJobManager = (self ? 
CheckIfJobRemoved(jobID))(timeout).mapTo[Boolean]
    +
    +      val allFutures = responses ++ Seq(jobRemovedOnJobManager)
    +
    +      import context.dispatcher
    +      Future.fold(allFutures)(true)(_ & _) map(decorateMessage(_)) pipeTo 
sender()
    +
    +    case CheckIfJobRemoved(jobID) =>
    +      if(currentJobs.contains(jobID)) {
    +        context.system.scheduler.scheduleOnce(
    +          200 milliseconds,
    +          self,
    +          decorateMessage(CheckIfJobRemoved(jobID))
    +        )(context.dispatcher, sender())
    +      } else {
    +        sender() ! decorateMessage(true)
    +      }
    +
    +    case NotifyWhenTaskManagerTerminated(taskManager) =>
    +      val waiting = 
waitForTaskManagerToBeTerminated.getOrElse(taskManager.path.name, Set())
    +      waitForTaskManagerToBeTerminated += taskManager.path.name -> 
(waiting + sender)
    +
    +    case msg@Terminated(taskManager) =>
    +      super.handleMessage(msg)
    +
    +      waitForTaskManagerToBeTerminated.remove(taskManager.path.name) 
foreach {
    +        _ foreach {
    +          listener =>
    +            listener ! decorateMessage(TaskManagerTerminated(taskManager))
    +        }
    +      }
    +
    +    case NotifyWhenAccumulatorChange(jobID) =>
    +
    +      val (updated, registered) = waitForAccumulatorUpdate.
    +        getOrElse(jobID, (false, Set[ActorRef]()))
    +      waitForAccumulatorUpdate += jobID -> (updated, registered + sender)
    +      sender ! true
    +
    +    /**
    +     * Notification from the task manager that changed accumulator are 
transferred on next
    +     * Hearbeat. We need to keep this state to notify the listeners on 
next Heartbeat report.
    +     */
    +    case AccumulatorsChanged(jobID: JobID) =>
    +      waitForAccumulatorUpdate.get(jobID) match {
    +        case Some((updated, registered)) =>
    +          waitForAccumulatorUpdate.put(jobID, (true, registered))
    +        case None =>
    +      }
    +
    +    /**
    +     * Disabled async processing of accumulator values and send 
accumulators to the listeners if
    +     * we previously received an [[AccumulatorsChanged]] message.
    +     */
    +    case msg : Heartbeat =>
    +      super.handleMessage(msg)
    +
    +      waitForAccumulatorUpdate foreach {
    +        case (jobID, (updated, actors)) if updated =>
    +          currentJobs.get(jobID) match {
    +            case Some((graph, jobInfo)) =>
    +              val flinkAccumulators = graph.getFlinkAccumulators
    +              val userAccumulators = graph.aggregateUserAccumulators
    +              actors foreach {
    +                actor => actor ! UpdatedAccumulators(jobID, 
flinkAccumulators, userAccumulators)
    +              }
    +            case None =>
    +          }
    +          waitForAccumulatorUpdate.put(jobID, (false, actors))
    +        case _ =>
    +      }
    +
    +    case RequestWorkingTaskManager(jobID) =>
    +      currentJobs.get(jobID) match {
    +        case Some((eg, _)) =>
    +          if(eg.getAllExecutionVertices.asScala.isEmpty){
    +            sender ! decorateMessage(WorkingTaskManager(None))
    +          } else {
    +            val resource = 
eg.getAllExecutionVertices.asScala.head.getCurrentAssignedResource
    +
    +            if(resource == null){
    +              sender ! decorateMessage(WorkingTaskManager(None))
    +            } else {
    +              sender ! decorateMessage(
    +                WorkingTaskManager(
    +                  Some(resource.getInstance().getActorGateway)
    +                )
    +              )
    +            }
    +          }
    +        case None => sender ! decorateMessage(WorkingTaskManager(None))
    +      }
    +
    --- End diff --
    
    fixed


> Add high availability support for Yarn
> --------------------------------------
>
>                 Key: FLINK-2790
>                 URL: https://issues.apache.org/jira/browse/FLINK-2790
>             Project: Flink
>          Issue Type: Sub-task
>          Components: JobManager, TaskManager
>            Reporter: Till Rohrmann
>             Fix For: 0.10
>
>
> Add master high availability support for Yarn. The idea is to let Yarn 
> restart a failed application master in a new container. For that, we set the 
> number of application retries to something greater than 1. 
> From version 2.4.0 onwards, it is possible to reuse already started 
> containers for the TaskManagers, thus, avoiding unnecessary restart delays.
> From version 2.6.0 onwards, it is possible to specify an interval in which 
> the number of application attempts have to be exceeded in order to fail the 
> job. This will prevent long running jobs from eventually depleting all 
> available application attempts.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to