This is an automated email from the ASF dual-hosted git repository.

style95 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/openwhisk.git


The following commit(s) were added to refs/heads/master by this push:
     new 99fb2c5  Implement ActivationClientProxy (#5119)
99fb2c5 is described below

commit 99fb2c59e264d820d6b95a6319b7882f19aead4d
Author: ningyougang <[email protected]>
AuthorDate: Mon Jun 7 12:51:51 2021 +0800

    Implement ActivationClientProxy (#5119)
    
    ActivationClientProxy actor fetches activationMessage from scheudler side
    using akka grpc, and forwards the activationMessage to its parent actor
    FunctionPullContainerProxy actor.
---
 .../apache/openwhisk/common/TransactionId.scala    |   2 +
 .../containerpool/v2/ActivationClientProxy.scala   | 397 +++++++++++++++++-
 core/scheduler/src/main/protobuf/activation.proto  |  15 +-
 .../scheduler/grpc/ActivationServiceImpl.scala     |  70 ++--
 .../v2/test/ActivationClientProxyTests.scala       | 459 +++++++++++++++++++++
 .../grpc/test/ActivationServiceImplTests.scala     |  85 +++-
 6 files changed, 969 insertions(+), 59 deletions(-)

diff --git 
a/common/scala/src/main/scala/org/apache/openwhisk/common/TransactionId.scala 
b/common/scala/src/main/scala/org/apache/openwhisk/common/TransactionId.scala
index d3389b8..de9c8c0 100644
--- 
a/common/scala/src/main/scala/org/apache/openwhisk/common/TransactionId.scala
+++ 
b/common/scala/src/main/scala/org/apache/openwhisk/common/TransactionId.scala
@@ -191,6 +191,8 @@ case class TransactionId private (meta: 
TransactionMetadata) extends AnyVal {
       case Some(parent) => findRoot(parent)
       case _            => meta
     }
+
+  def serialize = TransactionId.serdes.write(this).compactPrint
 }
 
 /**
diff --git 
a/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/v2/ActivationClientProxy.scala
 
b/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/v2/ActivationClientProxy.scala
index 71bc5fa..32f33e7 100644
--- 
a/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/v2/ActivationClientProxy.scala
+++ 
b/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/v2/ActivationClientProxy.scala
@@ -17,15 +17,28 @@
 
 package org.apache.openwhisk.core.containerpool.v2
 
-import akka.actor.ActorRef
+import akka.actor.Status.{Failure => FailureMessage}
+import akka.actor.{ActorRef, ActorSystem, FSM, Props, Stash}
+import akka.grpc.internal.ClientClosedException
+import akka.pattern.pipe
+import akka.stream.ActorMaterializer
+import io.grpc.StatusRuntimeException
+import org.apache.openwhisk.common.{Logging, TransactionId}
 import org.apache.openwhisk.core.connector.ActivationMessage
-import org.apache.openwhisk.core.entity.{DocRevision, FullyQualifiedEntityName}
+import org.apache.openwhisk.core.containerpool.ContainerId
+import org.apache.openwhisk.core.entity._
 import org.apache.openwhisk.core.scheduler.SchedulerEndpoints
+import org.apache.openwhisk.core.scheduler.grpc.ActivationResponse
+import org.apache.openwhisk.core.scheduler.queue.{ActionMismatch, 
MemoryQueueError, NoActivationMessage, NoMemoryQueue}
+import org.apache.openwhisk.grpc.{ActivationServiceClient, FetchRequest, 
RescheduleRequest, RescheduleResponse}
+import spray.json.JsonParser.ParsingException
+
+import scala.concurrent.Future
+import scala.util.{Success, Try}
 
 // Event send by the actor
 case class ClientCreationCompleted(client: Option[ActorRef] = None)
 case object ClientClosed
-case object CloseClientProxy
 
 // Event received by the actor
 case object StartClient
@@ -34,10 +47,382 @@ case class RescheduleActivation(invocationNamespace: 
String,
                                 fqn: FullyQualifiedEntityName,
                                 rev: DocRevision,
                                 msg: ActivationMessage)
-
 case object RetryRequestActivation
 case object ContainerWarmed
+case object CloseClientProxy
 case object StopClientProxy
 
-// TODO, use grpc to fetch activation from memoryQueue
-class ActivationClientProxy {}
+// state
+sealed trait ActivationClientProxyState
+case object ClientProxyUninitialized extends ActivationClientProxyState
+case object ClientProxyReady extends ActivationClientProxyState
+case object ClientProxyRemoving extends ActivationClientProxyState
+
+// data
+sealed trait ActivationClientProxyData
+case class Client(activationClient: ActivationServiceClient, rpcHost: String, 
rpcPort: Int)
+    extends ActivationClientProxyData
+case class Retry(count: Int) extends ActivationClientProxyData
+
+class ActivationClientProxy(
+  invocationNamespace: String,
+  action: FullyQualifiedEntityName,
+  rev: DocRevision,
+  schedulerHost: String,
+  rpcPort: Int,
+  containerId: ContainerId,
+  activationClientFactory: (String, FullyQualifiedEntityName, String, Int, 
Boolean) => Future[ActivationServiceClient])(
+  implicit actorSystem: ActorSystem,
+  mat: ActorMaterializer,
+  logging: Logging)
+    extends FSM[ActivationClientProxyState, ActivationClientProxyData]
+    with Stash {
+
+  implicit val ec = actorSystem.dispatcher
+
+  private var warmed = false
+
+  startWith(ClientProxyUninitialized, Retry(3))
+
+  when(ClientProxyUninitialized) {
+    case Event(StartClient, r: Retry) =>
+      // build activation client using original scheduler endpoint firstly
+      createActivationClient(invocationNamespace, action, schedulerHost, 
rpcPort, tryOtherScheduler = false)
+        .pipeTo(self)
+
+      stay using r
+
+    case Event(client: ActivationClient, _) =>
+      context.parent ! ClientCreationCompleted()
+
+      goto(ClientProxyReady) using Client(client.client, client.rpcHost, 
client.rpcPort)
+
+    case Event(f: FailureMessage, _) =>
+      logging.error(this, s"failed to create grpc client for ${action} caused 
by: $f")
+      self ! ClientClosed
+
+      goto(ClientProxyRemoving)
+
+    case _ => delay
+  }
+
+  when(ClientProxyReady) {
+    case Event(request: RequestActivation, client: Client) =>
+      request.newScheduler match {
+        // if scheduler is changed, client needs to be recreated
+        case Some(scheduler) if scheduler.host != client.rpcHost || 
scheduler.rpcPort != client.rpcPort =>
+          val newHost = request.newScheduler.get.host
+          val newPort = request.newScheduler.get.rpcPort
+          client.activationClient
+            .close()
+            .flatMap(_ =>
+              createActivationClient(invocationNamespace, action, newHost, 
newPort, tryOtherScheduler = false))
+            .pipeTo(self)
+
+        case _ =>
+          requestActivationMessage(invocationNamespace, action, rev, 
client.activationClient, request.lastDuration)
+            .pipeTo(self)
+      }
+      stay()
+
+    case Event(e: RescheduleActivation, client: Client) =>
+      logging.info(this, s"got a reschedule message ${e.msg.activationId} for 
action: ${e.msg.action}")
+      client.activationClient
+        .rescheduleActivation(
+          RescheduleRequest(e.invocationNamespace, e.fqn.serialize, 
e.rev.serialize, e.msg.serialize))
+        .recover {
+          case t =>
+            logging.error(this, s"Failed to reschedule activation (error: $t)")
+            Future.successful(RescheduleResponse())
+        }
+        .foreach(res => {
+          context.parent ! res
+        })
+      stay()
+
+    case Event(msg: ActivationMessage, _: Client) =>
+      logging.debug(this, s"got a message ${msg.activationId} for action: 
${msg.action}")
+      context.parent ! msg
+
+      stay()
+
+    /**
+     * Case of scheduler error
+     */
+    case Event(error: MemoryQueueError, c: Client) =>
+      error match {
+        case _: NoMemoryQueue =>
+          logging.error(
+            this,
+            s"The queue of action ${action} under invocationNamespace 
${invocationNamespace} does not exist. Check for queues in other schedulers.")
+          c.activationClient
+            .close()
+            .flatMap(_ =>
+              createActivationClient(invocationNamespace, action, c.rpcHost, 
c.rpcPort, tryOtherScheduler = true))
+            .pipeTo(self)
+
+          stay()
+
+        case _: ActionMismatch =>
+          logging.error(this, s"action version does not match: $action")
+          c.activationClient.close().andThen {
+            case _ => self ! ClientClosed
+          }
+
+          goto(ClientProxyRemoving)
+
+        case _: NoActivationMessage => // retry
+          logging.debug(this, s"no activation message exist: $action")
+          context.parent ! RetryRequestActivation
+
+          stay()
+      }
+
+    /**
+     * Case of system error like grpc, parsing message
+     */
+    case Event(f: FailureMessage, c: Client) =>
+      f.cause match {
+        case t: ParsingException =>
+          logging.error(this, s"failed to parse activation message: $t")
+          context.parent ! RetryRequestActivation
+
+          stay()
+
+        // When scheduler pod recreated, the StatusRuntimeException with 
`Unable to resolve host` would happen.
+        // In such situation, it is better to stop the activationClientProxy, 
otherwise, in short time,
+        // it would print huge log due to create another grpcClient to fetch 
activation again.
+        case t: StatusRuntimeException if 
t.getMessage.contains(ActivationClientProxy.hostResolveError) =>
+          logging.error(this, s"akka grpc server connection failed: $t")
+          self ! ClientClosed
+
+          goto(ClientProxyRemoving)
+
+        case t: StatusRuntimeException =>
+          logging.error(this, s"akka grpc server connection failed: $t")
+          c.activationClient
+            .close()
+            .flatMap(_ =>
+              createActivationClient(invocationNamespace, action, c.rpcHost, 
c.rpcPort, tryOtherScheduler = true))
+            .pipeTo(self)
+
+          stay()
+
+        case _: ClientClosedException =>
+          logging.error(this, s"grpc client is already closed for $action")
+          self ! ClientClosed
+
+          goto(ClientProxyRemoving)
+
+        case t: Throwable =>
+          logging.error(this, s"get activation from remote server error: $t")
+          safelyCloseClient(c)
+          goto(ClientProxyRemoving)
+      }
+
+    case Event(client: ActivationClient, _) =>
+      // long poll
+      requestActivationMessage(invocationNamespace, action, rev, client.client)
+        .pipeTo(self)
+
+      stay using Client(client.client, client.rpcHost, client.rpcPort)
+  }
+
+  when(ClientProxyRemoving) {
+    case Event(request: RequestActivation, client: Client) =>
+      request.newScheduler match {
+        // if scheduler is changed, client needs to be recreated
+        case Some(scheduler) if scheduler.host != client.rpcHost || 
scheduler.rpcPort != client.rpcPort =>
+          val newHost = request.newScheduler.get.host
+          val newPort = request.newScheduler.get.rpcPort
+          client.activationClient
+            .close()
+            .flatMap(_ =>
+              createActivationClient(invocationNamespace, action, newHost, 
newPort, tryOtherScheduler = false))
+            .pipeTo(self)
+
+        case _ =>
+          requestActivationMessage(invocationNamespace, action, rev, 
client.activationClient, request.lastDuration)
+            .pipeTo(self)
+      }
+      stay()
+
+    case Event(msg: ActivationMessage, _: Client) =>
+      context.parent ! msg
+
+      stay()
+
+    case Event(_: MemoryQueueError, _: Client) =>
+      self ! ClientClosed
+
+      stay()
+
+    case Event(f: FailureMessage, c: Client) =>
+      logging.error(this, s"some error happened for action: ${action} in 
state: $stateName, caused by: $f")
+      safelyCloseClient(c)
+      stay()
+
+    case Event(client: ActivationClient, _) =>
+      // long poll
+      requestActivationMessage(invocationNamespace, action, rev, client.client)
+        .pipeTo(self)
+
+      stay using Client(client.client, client.rpcHost, client.rpcPort)
+  }
+
+  // Unstash all messages stashed while in intermediate state
+  onTransition {
+    case _ -> ClientProxyReady    => unstashAll()
+    case _ -> ClientProxyRemoving => unstashAll()
+  }
+
+  whenUnhandled {
+    case Event(ContainerWarmed, _) =>
+      warmed = true
+      stay
+
+    case Event(CloseClientProxy, c: Client) =>
+      safelyCloseClient(c)
+      goto(ClientProxyRemoving)
+
+    case Event(ClientClosed, _) =>
+      context.parent ! ClientClosed
+
+      stop()
+
+    case Event(StopClientProxy, c: Client) =>
+      safelyCloseClient(c)
+      stay()
+  }
+
+  initialize()
+
+  /** Delays all incoming messages until unstashAll() is called */
+  def delay = {
+    stash()
+    stay
+  }
+
+  /**
+   * Safely shut down the client.
+   */
+  private def safelyCloseClient(client: Client): Unit = {
+    Try {
+      client.activationClient
+        .fetchActivation(
+          FetchRequest(
+            TransactionId(TransactionId.generateTid()).serialize,
+            invocationNamespace,
+            action.serialize,
+            rev.serialize,
+            containerId.asString,
+            warmed,
+            None,
+            false))
+        .andThen {
+          case _ =>
+            client.activationClient.close().andThen {
+              case _ => self ! ClientClosed
+            }
+        }
+    }.recover {
+      // If the fetchActivation is executed when the client is closed, the 
andThen statement is not executed.
+      case _: ClientClosedException =>
+        self ! ClientClosed
+    }
+  }
+
+  /**
+   * Request activation message to scheduler by long poll
+   *
+   * @return ActivationMessage or MemoryQueueError
+   */
+  private def requestActivationMessage(invocationNamespace: String,
+                                       fqn: FullyQualifiedEntityName,
+                                       rev: DocRevision,
+                                       client: ActivationServiceClient,
+                                       lastDuration: Option[Long] = None) = {
+    Try {
+      client
+        .fetchActivation(
+          FetchRequest(
+            TransactionId(TransactionId.generateTid()).serialize,
+            invocationNamespace,
+            fqn.serialize,
+            rev.serialize,
+            containerId.asString,
+            warmed,
+            lastDuration,
+            true))
+        .flatMap { r =>
+          Future(ActivationResponse.parse(r.activationMessage))
+            .flatMap(Future.fromTry)
+            .flatMap {
+              case ActivationResponse(Right(msg)) =>
+                Future.successful(msg)
+              case ActivationResponse(Left(msg)) =>
+                Future.successful(msg)
+            }
+        }
+    }.recover {
+        case _: ClientClosedException =>
+          logging.debug(this, s"grpc client is closed for $fqn in the Try 
closure")
+          Future.successful(ClientClosed)
+      }
+      .getOrElse(Future.failed(new Exception(s"error to get $fqn activation 
from grpc server")))
+  }
+
+  private def createActivationClient(invocationNamespace: String,
+                                     fqn: FullyQualifiedEntityName,
+                                     schedulerHost: String,
+                                     rpcPort: Int,
+                                     tryOtherScheduler: Boolean,
+                                     retry: Int = 5): Future[ActivationClient] 
= {
+    activationClientFactory(invocationNamespace, fqn, schedulerHost, rpcPort, 
tryOtherScheduler)
+      .map { client =>
+        ActivationClient(client, schedulerHost, rpcPort)
+      }
+      .andThen {
+        case Success(_) => logging.debug(this, "The gRPC client created 
successfully")
+      }
+      .recoverWith {
+        case _: Throwable =>
+          if (retry < 5)
+            createActivationClient(invocationNamespace, action, schedulerHost, 
rpcPort, tryOtherScheduler, retry - 1)
+          else {
+            Future.failed(new Exception("The number of client creation retries 
has been exceeded."))
+          }
+      }
+  }
+}
+
+object ActivationClientProxy {
+
+  val hostResolveError = "Unable to resolve host"
+
+  def props(invocationNamespace: String,
+            action: FullyQualifiedEntityName,
+            rev: DocRevision,
+            schedulerHost: String,
+            rpcPort: Int,
+            containerId: ContainerId,
+            activationClientFactory: (String,
+                                      FullyQualifiedEntityName,
+                                      String,
+                                      Int,
+                                      Boolean) => 
Future[ActivationServiceClient])(implicit actorSystem: ActorSystem,
+                                                                               
    mat: ActorMaterializer,
+                                                                               
    logging: Logging) = {
+    Props(
+      new ActivationClientProxy(
+        invocationNamespace,
+        action,
+        rev,
+        schedulerHost,
+        rpcPort,
+        containerId,
+        activationClientFactory))
+  }
+}
+
+case class ActivationClient(client: ActivationServiceClient, rpcHost: String, 
rpcPort: Int)
diff --git a/core/scheduler/src/main/protobuf/activation.proto 
b/core/scheduler/src/main/protobuf/activation.proto
index fb16f48..e79dfda 100644
--- a/core/scheduler/src/main/protobuf/activation.proto
+++ b/core/scheduler/src/main/protobuf/activation.proto
@@ -36,15 +36,16 @@ service ActivationService {
 //#messages
 // The request message
 message FetchRequest {
-    string invocationNamespace = 1;
-    string fqn = 2;
-    string rev = 3;
-    string containerId = 4;
-    bool warmed = 5;
+    string transactionId = 1;
+    string invocationNamespace = 2;
+    string fqn = 3;
+    string rev = 4;
+    string containerId = 5;
+    bool warmed = 6;
     // This allows optional value
-    google.protobuf.Int64Value lastDuration = 6;
+    google.protobuf.Int64Value lastDuration = 7;
     // to record alive containers
-    bool alive = 7;
+    bool alive = 8;
 }
 
 // The response message
diff --git 
a/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/grpc/ActivationServiceImpl.scala
 
b/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/grpc/ActivationServiceImpl.scala
index d80cd42..acf311e 100644
--- 
a/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/grpc/ActivationServiceImpl.scala
+++ 
b/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/grpc/ActivationServiceImpl.scala
@@ -20,7 +20,8 @@ package org.apache.openwhisk.core.scheduler.grpc
 import akka.actor.ActorSystem
 import akka.pattern.ask
 import akka.util.Timeout
-import org.apache.openwhisk.common.Logging
+import org.apache.openwhisk.common.{Logging, TransactionId}
+import org.apache.openwhisk.core.WarmUp
 import org.apache.openwhisk.core.connector.{ActivationMessage, Message}
 import org.apache.openwhisk.core.entity.{DocRevision, FullyQualifiedEntityName}
 import org.apache.openwhisk.core.scheduler.queue._
@@ -51,7 +52,7 @@ class ActivationServiceImpl()(implicit actorSystem: 
ActorSystem, logging: Loggin
               this,
               s"Enqueue activation message to reschedule 
${request.invocationNamespace} ${request.fqn} ${request.rev}")
             queueValue.queue ? res._3
-            Future.successful(RescheduleResponse(isRescheduled = true))
+            Future.successful(RescheduleResponse(true))
           case None =>
             logging.error(this, s"Queue not found for 
${request.invocationNamespace} ${request.fqn} ${request.rev}")
             Future.successful(RescheduleResponse())
@@ -65,31 +66,43 @@ class ActivationServiceImpl()(implicit actorSystem: 
ActorSystem, logging: Loggin
       fqn <- FullyQualifiedEntityName.parse(request.fqn)
       rev <- DocRevision.parse(request.rev)
     } yield (fqn, rev)).flatMap(Future.fromTry) flatMap { res =>
-      val key = res._1.toDocId.asDocInfo(res._2)
-      QueuePool.get(MemoryQueueKey(request.invocationNamespace, key)) match {
-        case Some(queueValue) =>
-          (queueValue.queue ? GetActivation(
-            res._1,
-            request.containerId,
-            request.warmed,
-            request.lastDuration,
-            request.alive))
-            .mapTo[ActivationResponse]
-            .map { response =>
-              FetchResponse(response.serialize)
-            }
-            .recover {
-              case t: Throwable =>
-                logging.error(this, s"Failed to get message from QueueManager, 
error: ${t.getMessage}")
-                
FetchResponse(ActivationResponse(Left(NoActivationMessage())).serialize)
-            }
-        case None =>
-          if (QueuePool.keys.exists { mkey =>
-                mkey.invocationNamespace == request.invocationNamespace && 
mkey.docInfo.id == key.id
-              })
-            
Future.successful(FetchResponse(ActivationResponse(Left(ActionMismatch())).serialize))
-          else
-            
Future.successful(FetchResponse(ActivationResponse(Left(NoMemoryQueue())).serialize))
+      val (fqn, rev) = res
+      if (!WarmUp.isWarmUpAction(fqn)) {
+        val key = fqn.toDocId.asDocInfo(rev)
+        QueuePool.get(MemoryQueueKey(request.invocationNamespace, key)) match {
+          case Some(queueValue) =>
+            implicit val transid = 
TransactionId.serdes.read(request.transactionId.parseJson)
+            (queueValue.queue ? GetActivation(
+              transid,
+              fqn,
+              request.containerId,
+              request.warmed,
+              request.lastDuration,
+              request.alive))
+              .mapTo[ActivationResponse]
+              .map { response =>
+                FetchResponse(response.serialize)
+              }
+              .recover {
+                case t: Throwable =>
+                  logging.error(
+                    this,
+                    s"Failed to get message from QueueManager container: 
${request.containerId}, fqn: ${request.fqn}, rev: ${request.rev}, alive: 
${request.alive}, lastDuration: ${request.lastDuration}, error: 
${t.getMessage}")
+                  
FetchResponse(ActivationResponse(Left(NoActivationMessage())).serialize)
+              }
+          case None =>
+            if (QueuePool.keys.exists { mkey =>
+                  mkey.invocationNamespace == request.invocationNamespace && 
mkey.docInfo.id == key.id
+                })
+              
Future.successful(FetchResponse(ActivationResponse(Left(ActionMismatch())).serialize))
+            else
+              
Future.successful(FetchResponse(ActivationResponse(Left(NoMemoryQueue())).serialize))
+        }
+      } else {
+        logging.info(
+          this,
+          s"The ${request.fqn} action is an action used to connect a network 
level connection. So response no activation")
+        
Future.successful(FetchResponse(ActivationResponse(Left(NoActivationMessage())).serialize))
       }
     }
   }
@@ -101,7 +114,8 @@ object ActivationServiceImpl {
     new ActivationServiceImpl()
 }
 
-case class GetActivation(action: FullyQualifiedEntityName,
+case class GetActivation(transactionId: TransactionId,
+                         action: FullyQualifiedEntityName,
                          containerId: String,
                          warmed: Boolean,
                          lastDuration: Option[Long],
diff --git 
a/tests/src/test/scala/org/apache/openwhisk/core/containerpool/v2/test/ActivationClientProxyTests.scala
 
b/tests/src/test/scala/org/apache/openwhisk/core/containerpool/v2/test/ActivationClientProxyTests.scala
new file mode 100644
index 0000000..737aa5d
--- /dev/null
+++ 
b/tests/src/test/scala/org/apache/openwhisk/core/containerpool/v2/test/ActivationClientProxyTests.scala
@@ -0,0 +1,459 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.openwhisk.core.containerpool.v2.test
+
+import akka.Done
+import akka.actor.FSM.{CurrentState, SubscribeTransitionCallBack, Transition}
+import akka.actor.{ActorRef, ActorSystem}
+import akka.grpc.internal.ClientClosedException
+import akka.stream.ActorMaterializer
+import akka.testkit.{ImplicitSender, TestKit, TestProbe}
+import common.StreamLogging
+import io.grpc.StatusRuntimeException
+import org.apache.openwhisk.common.TransactionId
+import org.apache.openwhisk.core.connector.ActivationMessage
+import org.apache.openwhisk.core.containerpool.ContainerId
+import org.apache.openwhisk.core.containerpool.v2._
+import org.apache.openwhisk.core.entity.ExecManifest.{ImageName, 
RuntimeManifest}
+import org.apache.openwhisk.core.entity._
+import org.apache.openwhisk.core.scheduler.SchedulerEndpoints
+import org.apache.openwhisk.core.scheduler.grpc.{ActivationResponse => 
AResponse}
+import org.apache.openwhisk.core.scheduler.queue.{ActionMismatch, 
NoActivationMessage, NoMemoryQueue}
+import org.apache.openwhisk.grpc
+import org.apache.openwhisk.grpc.{ActivationServiceClient, FetchRequest, 
RescheduleRequest, RescheduleResponse}
+import org.junit.runner.RunWith
+import org.scalamock.scalatest.MockFactory
+import org.scalatest.junit.JUnitRunner
+import org.scalatest.{BeforeAndAfterAll, FlatSpecLike, Matchers}
+import org.scalatest.concurrent.ScalaFutures
+
+import scala.collection.mutable.ArrayBuffer
+import scala.concurrent.Future
+import scala.concurrent.duration._
+
+@RunWith(classOf[JUnitRunner])
+class ActivationClientProxyTests
+    extends TestKit(ActorSystem("ActivationClientProxy"))
+    with ImplicitSender
+    with FlatSpecLike
+    with Matchers
+    with MockFactory
+    with BeforeAndAfterAll
+    with StreamLogging
+    with ScalaFutures {
+
+  override def afterAll: Unit = TestKit.shutdownActorSystem(system)
+
+  implicit val mat = ActorMaterializer()
+  implicit val ec = system.dispatcher
+
+  val timeout = 20.seconds
+
+  val log = logging
+
+  val exec = CodeExecAsString(RuntimeManifest("actionKind", 
ImageName("testImage")), "testCode", None)
+  val action = ExecutableWhiskAction(EntityPath("actionSpace"), 
EntityName("actionName"), exec)
+  val fqn = action.fullyQualifiedName(true)
+  val rev = action.rev
+  val schedulerHost = "127.17.0.1"
+  val rpcPort = 13001
+  val containerId = ContainerId("fakeContainerId")
+  val messageTransId = TransactionId(TransactionId.testing.meta.id)
+  val invocationNamespace = EntityName("invocationSpace")
+  val uuid = UUID()
+
+  val message = ActivationMessage(
+    messageTransId,
+    action.fullyQualifiedName(true),
+    action.rev,
+    Identity(Subject(), Namespace(invocationNamespace, uuid), 
BasicAuthenticationAuthKey(uuid, Secret()), Set.empty),
+    ActivationId.generate(),
+    ControllerInstanceId("0"),
+    blocking = false,
+    content = None)
+
+  val entityStore = WhiskEntityStore.datastore()
+
+  behavior of "ActivationClientProxy"
+
+  it should "create a grpc client successfully" in within(timeout) {
+    val fetch = (_: FetchRequest) => 
Future(grpc.FetchResponse(AResponse(Right(message)).serialize))
+    val client = (_: String, _: FullyQualifiedEntityName, _: String, _: Int, 
_: Boolean) =>
+      Future(MockActivationServiceClient(fetch))
+
+    val probe = TestProbe()
+    val machine =
+      probe.childActorOf(
+        ActivationClientProxy
+          .props(invocationNamespace.asString, fqn, rev, schedulerHost, 
rpcPort, containerId, client))
+    registerCallback(machine, probe)
+
+    machine ! StartClient
+
+    probe.expectMsg(ClientCreationCompleted())
+    probe.expectMsg(Transition(machine, ClientProxyUninitialized, 
ClientProxyReady))
+  }
+
+  it should "be closed when failed to create grpc client" in within(timeout) {
+    val fetch = (_: FetchRequest) => 
Future(grpc.FetchResponse(AResponse(Right(message)).serialize))
+    val client = (_: String, _: FullyQualifiedEntityName, _: String, _: Int, 
_: Boolean) =>
+      Future {
+        throw new RuntimeException("failed to create client")
+        MockActivationServiceClient(fetch)
+    }
+
+    val probe = TestProbe()
+    val machine =
+      probe.childActorOf(
+        ActivationClientProxy
+          .props(invocationNamespace.asString, fqn, rev, schedulerHost, 
rpcPort, containerId, client))
+    registerCallback(machine, probe)
+
+    machine ! StartClient
+
+    probe.expectMsg(Transition(machine, ClientProxyUninitialized, 
ClientProxyRemoving))
+    probe.expectMsg(ClientClosed)
+
+    probe expectTerminated machine
+  }
+
+  it should "fetch activation message successfully" in within(timeout) {
+    val fetch = (_: FetchRequest) => 
Future(grpc.FetchResponse(AResponse(Right(message)).serialize))
+    val client = (_: String, _: FullyQualifiedEntityName, _: String, _: Int, 
_: Boolean) =>
+      Future(MockActivationServiceClient(fetch))
+
+    val probe = TestProbe()
+    val machine =
+      probe.childActorOf(
+        ActivationClientProxy
+          .props(invocationNamespace.asString, fqn, rev, schedulerHost, 
rpcPort, containerId, client))
+    registerCallback(machine, probe)
+    ready(machine, probe)
+
+    machine ! RequestActivation()
+    probe.expectMsg(message)
+  }
+
+  it should "be recreated when scheduler is changed" in within(timeout) {
+    var creationCount = 0
+    val fetch = (_: FetchRequest) => 
Future(grpc.FetchResponse(AResponse(Left(NoMemoryQueue())).serialize))
+    val client = (_: String, _: FullyQualifiedEntityName, _: String, _: Int, 
_: Boolean) => {
+      creationCount += 1
+      Future(MockActivationServiceClient(fetch))
+    }
+
+    val probe = TestProbe()
+    val machine =
+      probe.childActorOf(
+        ActivationClientProxy
+          .props(invocationNamespace.asString, fqn, rev, schedulerHost, 
rpcPort, containerId, client))
+    registerCallback(machine, probe)
+    ready(machine, probe)
+
+    // new scheduler is reached
+    machine ! RequestActivation(newScheduler = 
Some(SchedulerEndpoints("0.0.0.0", 10, 11)))
+
+    awaitAssert {
+      creationCount should be > 1
+    }
+  }
+
+  it should "be recreated when the queue does not exist" in within(timeout) {
+    var creationCount = 0
+    val fetch = (_: FetchRequest) => 
Future(grpc.FetchResponse(AResponse(Left(NoMemoryQueue())).serialize))
+    val client = (_: String, _: FullyQualifiedEntityName, _: String, _: Int, 
_: Boolean) => {
+      creationCount += 1
+      Future(MockActivationServiceClient(fetch))
+    }
+
+    val probe = TestProbe()
+    val machine =
+      probe.childActorOf(
+        ActivationClientProxy
+          .props(invocationNamespace.asString, fqn, rev, schedulerHost, 
rpcPort, containerId, client))
+    registerCallback(machine, probe)
+    ready(machine, probe)
+
+    machine ! RequestActivation()
+
+    awaitAssert {
+      creationCount should be > 1
+    }
+  }
+
+  it should "be closed when the action version does not match" in 
within(timeout) {
+    val fetch = (_: FetchRequest) => 
Future(grpc.FetchResponse(AResponse(Left(ActionMismatch())).serialize))
+    val client = (_: String, _: FullyQualifiedEntityName, _: String, _: Int, 
_: Boolean) =>
+      Future(MockActivationServiceClient(fetch))
+
+    val probe = TestProbe()
+    val machine =
+      probe.childActorOf(
+        ActivationClientProxy
+          .props(invocationNamespace.asString, fqn, rev, schedulerHost, 
rpcPort, containerId, client))
+    registerCallback(machine, probe)
+    ready(machine, probe)
+
+    machine ! RequestActivation()
+    probe.expectMsg(Transition(machine, ClientProxyReady, ClientProxyRemoving))
+    probe.expectMsg(ClientClosed)
+
+    probe expectTerminated machine
+  }
+
+  it should "retry to request activation message when scheduler response no 
activation message" in within(timeout) {
+    val fetch = (_: FetchRequest) => 
Future(grpc.FetchResponse(AResponse(Left(NoActivationMessage())).serialize))
+    val client = (_: String, _: FullyQualifiedEntityName, _: String, _: Int, 
_: Boolean) =>
+      Future(MockActivationServiceClient(fetch))
+
+    val probe = TestProbe()
+    val machine =
+      probe.childActorOf(
+        ActivationClientProxy
+          .props(invocationNamespace.asString, fqn, rev, schedulerHost, 
rpcPort, containerId, client))
+    registerCallback(machine, probe)
+    ready(machine, probe)
+
+    machine ! RequestActivation()
+    probe.expectMsg(RetryRequestActivation)
+  }
+
+  it should "create activation client on other scheduler when the queue does 
not exist" in within(timeout) {
+    val createClientOnOtherScheduler = new ArrayBuffer[Boolean]()
+    val fetch = (_: FetchRequest) => 
Future(grpc.FetchResponse(AResponse(Left(NoMemoryQueue())).serialize))
+    val client = (_: String, _: FullyQualifiedEntityName, _: String, _: Int, 
tryOtherScheduler: Boolean) => {
+      createClientOnOtherScheduler += tryOtherScheduler
+      Future(MockActivationServiceClient(fetch))
+    }
+
+    val probe = TestProbe()
+    val machine =
+      probe.childActorOf(
+        ActivationClientProxy
+          .props(invocationNamespace.asString, fqn, rev, schedulerHost, 
rpcPort, containerId, client))
+    registerCallback(machine, probe)
+    ready(machine, probe)
+
+    machine ! RequestActivation()
+
+    awaitAssert {
+      // Create activation client using original scheduler endpoint firstly
+      createClientOnOtherScheduler(0) shouldBe false
+      // Create activation client using latest scheduler endpoint(try other 
scheduler) when no memoryQueue
+      createClientOnOtherScheduler(1) shouldBe true
+    }
+  }
+
+  it should "request activation message when the message can't deserialize" in 
within(timeout) {
+    val fetch = (_: FetchRequest) => Future(grpc.FetchResponse("aaaaaa"))
+    val client = (_: String, _: FullyQualifiedEntityName, _: String, _: Int, 
_: Boolean) =>
+      Future(MockActivationServiceClient(fetch))
+
+    val probe = TestProbe()
+    val machine =
+      probe.childActorOf(
+        ActivationClientProxy
+          .props(invocationNamespace.asString, fqn, rev, schedulerHost, 
rpcPort, containerId, client))
+    registerCallback(machine, probe)
+    ready(machine, probe)
+
+    machine ! RequestActivation()
+    probe.expectMsg(RetryRequestActivation)
+  }
+
+  it should "be recreated when akka grpc server connection failed" in 
within(timeout) {
+    var creationCount = 0
+    val fetch = (_: FetchRequest) =>
+      Future {
+        throw new StatusRuntimeException(io.grpc.Status.UNAVAILABLE)
+        grpc.FetchResponse(AResponse(Right(message)).serialize)
+    }
+    val client = (_: String, _: FullyQualifiedEntityName, _: String, _: Int, 
_: Boolean) => {
+      creationCount += 1
+      Future(MockActivationServiceClient(fetch))
+    }
+
+    val probe = TestProbe()
+    val machine =
+      probe.childActorOf(
+        ActivationClientProxy
+          .props(invocationNamespace.asString, fqn, rev, schedulerHost, 
rpcPort, containerId, client))
+    registerCallback(machine, probe)
+    ready(machine, probe)
+
+    machine ! RequestActivation()
+
+    awaitAssert {
+      creationCount should be > 1
+    }
+  }
+
+  it should "be closed when grpc client is already closed" in within(timeout) {
+    val fetch = (_: FetchRequest) =>
+      Future {
+        throw new ClientClosedException()
+        grpc.FetchResponse(AResponse(Right(message)).serialize)
+    }
+    val client = (_: String, _: FullyQualifiedEntityName, _: String, _: Int, 
_: Boolean) =>
+      Future(MockActivationServiceClient(fetch))
+
+    val probe = TestProbe()
+    val machine =
+      probe.childActorOf(
+        ActivationClientProxy
+          .props(invocationNamespace.asString, fqn, rev, schedulerHost, 
rpcPort, containerId, client))
+    registerCallback(machine, probe)
+    ready(machine, probe)
+
+    machine ! RequestActivation()
+    probe.expectMsg(Transition(machine, ClientProxyReady, ClientProxyRemoving))
+    probe.expectMsg(ClientClosed)
+
+    probe expectTerminated machine
+  }
+
+  it should "be closed when it failed to getting activation from scheduler" in 
within(timeout) {
+    val fetch = (_: FetchRequest) =>
+      Future {
+        throw new Exception("Unknown exception")
+        grpc.FetchResponse(AResponse(Right(message)).serialize)
+    }
+    val client = (_: String, _: FullyQualifiedEntityName, _: String, _: Int, 
_: Boolean) =>
+      Future(MockActivationServiceClient(fetch))
+
+    val probe = TestProbe()
+    val machine =
+      probe.childActorOf(
+        ActivationClientProxy
+          .props(invocationNamespace.asString, fqn, rev, schedulerHost, 
rpcPort, containerId, client))
+    registerCallback(machine, probe)
+    ready(machine, probe)
+
+    machine ! RequestActivation()
+    probe.expectMsg(Transition(machine, ClientProxyReady, ClientProxyRemoving))
+    probe.expectMsg(ClientClosed)
+
+    probe expectTerminated machine
+  }
+
+  it should "be closed when it receives a CloseClientProxy message for a 
normal timeout case" in within(timeout) {
+    val fetch = (_: FetchRequest) => 
Future(grpc.FetchResponse(AResponse(Right(message)).serialize))
+    val activationClient = MockActivationServiceClient(fetch)
+    val client = (_: String, _: FullyQualifiedEntityName, _: String, _: Int, 
_: Boolean) => Future(activationClient)
+
+    val probe = TestProbe()
+    val machine =
+      probe.childActorOf(
+        ActivationClientProxy
+          .props(invocationNamespace.asString, fqn, rev, schedulerHost, 
rpcPort, containerId, client))
+    registerCallback(machine, probe)
+    ready(machine, probe)
+
+    machine ! CloseClientProxy
+    awaitAssert(activationClient.isClosed shouldBe true)
+
+    probe.expectMsg(Transition(machine, ClientProxyReady, ClientProxyRemoving))
+
+    machine ! RequestActivation()
+
+    probe expectMsg ClientClosed
+    probe expectTerminated machine
+  }
+
+  it should "be closed when it receives a StopClientProxy message for the case 
of graceful shutdown" in within(timeout) {
+    val fetch = (_: FetchRequest) => 
Future(grpc.FetchResponse(AResponse(Right(message)).serialize))
+    val activationClient = MockActivationServiceClient(fetch)
+    val client = (_: String, _: FullyQualifiedEntityName, _: String, _: Int, 
_: Boolean) => Future(activationClient)
+
+    val probe = TestProbe()
+    val machine =
+      probe.childActorOf(
+        ActivationClientProxy
+          .props(invocationNamespace.asString, fqn, rev, schedulerHost, 
rpcPort, containerId, client))
+    registerCallback(machine, probe)
+    ready(machine, probe)
+
+    machine ! StopClientProxy
+    awaitAssert(activationClient.isClosed shouldBe true)
+
+    probe expectMsg ClientClosed
+    probe expectTerminated machine
+  }
+
+  it should "be safely closed when the client is already closed" in 
within(timeout) {
+    val fetch = (_: FetchRequest) => 
Future(grpc.FetchResponse(AResponse(Right(message)).serialize))
+    val activationClient = MockActivationServiceClient(fetch)
+    val client = (_: String, _: FullyQualifiedEntityName, _: String, _: Int, 
_: Boolean) => Future(activationClient)
+
+    val probe = TestProbe()
+    val machine =
+      probe.childActorOf(
+        ActivationClientProxy
+          .props(invocationNamespace.asString, fqn, rev, schedulerHost, 
rpcPort, containerId, client))
+    registerCallback(machine, probe)
+    ready(machine, probe)
+
+    // close client
+    activationClient.close().futureValue
+    awaitAssert(activationClient.isClosed shouldBe true)
+
+    // close client again
+    machine ! StopClientProxy
+
+    probe expectMsg ClientClosed
+    probe expectTerminated machine
+  }
+
+  /** Registers the transition callback and expects the first message */
+  def registerCallback(c: ActorRef, probe: TestProbe) = {
+    c ! SubscribeTransitionCallBack(probe.ref)
+    probe.expectMsg(CurrentState(c, ClientProxyUninitialized))
+    probe watch c
+  }
+
+  def ready(machine: ActorRef, probe: TestProbe) = {
+    machine ! StartClient
+    probe.expectMsg(ClientCreationCompleted())
+    probe.expectMsg(Transition(machine, ClientProxyUninitialized, 
ClientProxyReady))
+  }
+
+  case class MockActivationServiceClient(customFetchActivation: FetchRequest 
=> Future[grpc.FetchResponse])
+      extends ActivationServiceClient {
+
+    var isClosed = false
+
+    override def close(): Future[Done] = {
+      isClosed = true
+      Future.successful(Done)
+    }
+
+    override def closed(): Future[Done] = close()
+
+    override def rescheduleActivation(in: RescheduleRequest): 
Future[RescheduleResponse] = {
+      Future.successful(RescheduleResponse())
+    }
+
+    override def fetchActivation(in: FetchRequest): Future[grpc.FetchResponse] 
= {
+      if (!isClosed) {
+        customFetchActivation(in)
+      } else {
+        throw new ClientClosedException()
+      }
+    }
+  }
+}
diff --git 
a/tests/src/test/scala/org/apache/openwhisk/core/scheduler/grpc/test/ActivationServiceImplTests.scala
 
b/tests/src/test/scala/org/apache/openwhisk/core/scheduler/grpc/test/ActivationServiceImplTests.scala
index 16c6f1c..b3838d9 100644
--- 
a/tests/src/test/scala/org/apache/openwhisk/core/scheduler/grpc/test/ActivationServiceImplTests.scala
+++ 
b/tests/src/test/scala/org/apache/openwhisk/core/scheduler/grpc/test/ActivationServiceImplTests.scala
@@ -21,6 +21,7 @@ import akka.actor.{Actor, ActorSystem, Props}
 import akka.testkit.{ImplicitSender, TestKit}
 import common.StreamLogging
 import org.apache.openwhisk.common.TransactionId
+import org.apache.openwhisk.core.WarmUp.warmUpAction
 import org.apache.openwhisk.core.connector.ActivationMessage
 import org.apache.openwhisk.core.entity._
 import org.apache.openwhisk.core.scheduler.grpc.ActivationServiceImpl
@@ -28,16 +29,19 @@ import org.apache.openwhisk.core.scheduler.queue.{
   ActionMismatch,
   MemoryQueueKey,
   MemoryQueueValue,
+  NoActivationMessage,
   NoMemoryQueue,
   QueuePool
 }
-import org.apache.openwhisk.grpc.{FetchRequest, FetchResponse, 
RescheduleRequest, RescheduleResponse}
+import org.apache.openwhisk.grpc.{FetchRequest, FetchResponse}
 import org.junit.runner.RunWith
 import org.scalatest.junit.JUnitRunner
 import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach, FlatSpecLike, 
Matchers}
 import org.apache.openwhisk.core.scheduler.grpc.{ActivationResponse, 
GetActivation}
 import org.scalatest.concurrent.ScalaFutures
+import spray.json.JsonParser.ParsingException
 
+import scala.concurrent.{Await, Future}
 import scala.concurrent.duration._
 
 @RunWith(classOf[JUnitRunner])
@@ -58,6 +62,8 @@ class ActivationServiceImplTests
   }
   override def beforeEach = QueuePool.clear()
 
+  private def await[T](awaitable: Future[T], timeout: FiniteDuration = 
10.seconds) = Await.result(awaitable, timeout)
+
   behavior of "ActivationService"
 
   implicit val ec = system.dispatcher
@@ -80,7 +86,7 @@ class ActivationServiceImplTests
     blocking = false,
     content = None)
 
-  it should "send GetActivation message to the MemoryQueue actor" in {
+  it should "delegate the FetchRequest to a MemoryQueue" in {
 
     val mock = system.actorOf(Props(new Actor() {
       override def receive: Receive = {
@@ -93,9 +99,11 @@ class ActivationServiceImplTests
     QueuePool.put(MemoryQueueKey(testEntityPath.asString, testDoc), 
MemoryQueueValue(mock, true))
     val activationServiceImpl = ActivationServiceImpl()
 
+    val tid = TransactionId(TransactionId.generateTid())
     activationServiceImpl
       .fetchActivation(
         FetchRequest(
+          tid.serialize,
           message.user.namespace.name.asString,
           testFQN.serialize,
           testDocRevision.serialize,
@@ -104,15 +112,16 @@ class ActivationServiceImplTests
           alive = true))
       .futureValue shouldBe 
FetchResponse(ActivationResponse(Right(message)).serialize)
 
-    expectMsg(GetActivation(testFQN, testContainerId, false, None))
+    expectMsg(GetActivation(tid, testFQN, testContainerId, false, None))
   }
 
-  it should "return NoMemoryQueue if there is no queue" in {
+  it should "return without any retry if there is no such queue" in {
     val activationServiceImpl = ActivationServiceImpl()
 
     activationServiceImpl
       .fetchActivation(
         FetchRequest(
+          TransactionId(TransactionId.generateTid()).serialize,
           message.user.namespace.name.asString,
           testFQN.serialize,
           testDocRevision.serialize,
@@ -133,6 +142,7 @@ class ActivationServiceImplTests
     activationServiceImpl
       .fetchActivation(
         FetchRequest( // same doc id but with a different doc revision
+          TransactionId(TransactionId.generateTid()).serialize,
           message.user.namespace.name.asString,
           testFQN.serialize,
           DocRevision("new-one").serialize,
@@ -144,28 +154,67 @@ class ActivationServiceImplTests
     expectNoMessage(200.millis)
   }
 
-  it should "reschedule activation message to the queue" in {
+  it should "return NoActivationMessage if it is a warm-up action" in {
 
-    val mock = system.actorOf(Props(new Actor() {
-      override def receive: Receive = {
-        case message: ActivationMessage =>
-          testActor ! message
-      }
-    }))
     val activationServiceImpl = ActivationServiceImpl()
 
-    QueuePool.put(MemoryQueueKey(testEntityPath.asString, testDoc), 
MemoryQueueValue(mock, true))
+    QueuePool.put(MemoryQueueKey(testEntityPath.asString, testDoc), 
MemoryQueueValue(testActor, true))
 
     activationServiceImpl
-      .rescheduleActivation(
-        RescheduleRequest( // same doc id but with a different doc revision
+      .fetchActivation(
+        FetchRequest(
+          TransactionId(TransactionId.generateTid()).serialize,
           message.user.namespace.name.asString,
-          testFQN.serialize,
+          warmUpAction.serialize,
           testDocRevision.serialize,
-          message.serialize))
-      .futureValue shouldBe RescheduleResponse(isRescheduled = true)
+          testContainerId,
+          false,
+          alive = true))
+      .futureValue shouldBe 
FetchResponse(ActivationResponse(Left(NoActivationMessage())).serialize)
+
+    expectNoMessage(200.millis)
+  }
+
+  it should "throw parsing error if fqn can't be parsed" in {
+    val notParsableFqn = "aaaaaaaaa"
+
+    val activationServiceImpl = ActivationServiceImpl()
+
+    QueuePool.put(MemoryQueueKey(testEntityPath.asString, testDoc), 
MemoryQueueValue(testActor, true))
+
+    a[ParsingException] should be thrownBy await {
+      activationServiceImpl
+        .fetchActivation(
+          FetchRequest(
+            TransactionId(TransactionId.generateTid()).serialize,
+            message.user.namespace.name.asString,
+            notParsableFqn,
+            testDocRevision.serialize,
+            testContainerId,
+            false,
+            alive = true))
+    }
+  }
+
+  it should "throw parsing error if rev can't be parsed" in {
+    val notParsableRev = "aaaaaaaaa"
+
+    val activationServiceImpl = ActivationServiceImpl()
+
+    QueuePool.put(MemoryQueueKey(testEntityPath.asString, testDoc), 
MemoryQueueValue(testActor, true))
 
-    expectMsg(message)
+    a[ParsingException] should be thrownBy await {
+      activationServiceImpl
+        .fetchActivation(
+          FetchRequest(
+            TransactionId(TransactionId.generateTid()).serialize,
+            message.user.namespace.name.asString,
+            testFQN.serialize,
+            notParsableRev,
+            testContainerId,
+            false,
+            alive = true))
+    }
   }
 
 }

Reply via email to