This is an automated email from the ASF dual-hosted git repository.

ningyougang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/openwhisk.git


The following commit(s) were added to refs/heads/master by this push:
     new 88435790c [Scheduler Enhancement] Remove deleted containers. (#5265)
88435790c is described below

commit 88435790cbed79db3f887c1918599cb9dead2590
Author: Dominic Kim <[email protected]>
AuthorDate: Tue Jul 5 11:27:52 2022 +0900

    [Scheduler Enhancement] Remove deleted containers. (#5265)
    
    * Apply ScalaFmt.
    
    * Do not send messages to deleted containers.
    
    * Use milliseconds as a measurement unit.
    
    * Fix test code.
---
 .../org/apache/openwhisk/common/Logging.scala      |  5 +-
 .../core/scheduler/queue/MemoryQueue.scala         | 36 ++++++++++++---
 .../scheduler/queue/test/MemoryQueueTests.scala    | 54 ++++++++++++++++++++++
 3 files changed, 87 insertions(+), 8 deletions(-)

diff --git 
a/common/scala/src/main/scala/org/apache/openwhisk/common/Logging.scala 
b/common/scala/src/main/scala/org/apache/openwhisk/common/Logging.scala
index c0dbb783a..a9da001b0 100644
--- a/common/scala/src/main/scala/org/apache/openwhisk/common/Logging.scala
+++ b/common/scala/src/main/scala/org/apache/openwhisk/common/Logging.scala
@@ -592,8 +592,9 @@ object LoggingMarkers {
   // Time that is needed to produce message in kafka
   val SCHEDULER_KAFKA = LogMarkerToken(scheduler, kafka, 
start)(MeasurementUnit.time.milliseconds)
   val SCHEDULER_KAFKA_WAIT_TIME =
-    LogMarkerToken(scheduler, "kafkaWaitTime", counter)(MeasurementUnit.none)
-  def SCHEDULER_WAIT_TIME(action: String) = LogMarkerToken(scheduler, 
"waitTime", counter, Some(action), Map("action" -> 
action))(MeasurementUnit.none)
+    LogMarkerToken(scheduler, "kafkaWaitTime", 
counter)(MeasurementUnit.time.milliseconds)
+  def SCHEDULER_WAIT_TIME(action: String) =
+    LogMarkerToken(scheduler, "waitTime", counter, Some(action), Map("action" 
-> action))(MeasurementUnit.time.milliseconds)
 
   def SCHEDULER_KEEP_ALIVE(leaseId: Long) =
     LogMarkerToken(scheduler, "keepAlive", counter, None, Map("leaseId" -> 
leaseId.toString))(MeasurementUnit.none)
diff --git 
a/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/queue/MemoryQueue.scala
 
b/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/queue/MemoryQueue.scala
index 51a674457..e4ba0022b 100644
--- 
a/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/queue/MemoryQueue.scala
+++ 
b/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/queue/MemoryQueue.scala
@@ -35,7 +35,12 @@ import org.apache.openwhisk.core.etcd.EtcdClient
 import org.apache.openwhisk.core.etcd.EtcdKV.ContainerKeys.containerPrefix
 import org.apache.openwhisk.core.etcd.EtcdKV.{ContainerKeys, QueueKeys, 
ThrottlingKeys}
 import org.apache.openwhisk.core.scheduler.grpc.{GetActivation, 
ActivationResponse => GetActivationResponse}
-import org.apache.openwhisk.core.scheduler.message.{ContainerCreation, 
ContainerDeletion, FailedCreationJob, SuccessfulCreationJob}
+import org.apache.openwhisk.core.scheduler.message.{
+  ContainerCreation,
+  ContainerDeletion,
+  FailedCreationJob,
+  SuccessfulCreationJob
+}
 import org.apache.openwhisk.core.scheduler.{SchedulerEndpoints, 
SchedulingConfig}
 import org.apache.openwhisk.core.service._
 import org.apache.openwhisk.http.Messages.{namespaceLimitUnderZero, 
tooManyConcurrentRequests}
@@ -47,7 +52,7 @@ import scala.annotation.tailrec
 import scala.collection.immutable.Queue
 import scala.collection.mutable
 import scala.concurrent.duration._
-import scala.concurrent.{ExecutionContextExecutor, Future, Promise, duration}
+import scala.concurrent.{duration, ExecutionContextExecutor, Future, Promise}
 import scala.util.{Failure, Success}
 
 // States
@@ -453,7 +458,9 @@ class MemoryQueue(private val etcdClient: EtcdClient,
         case `inProgressContainerPrefixKey` =>
           creationIds -= key.split("/").last
         case `existingContainerPrefixKey` =>
-          containers -= key.split("/").last
+          val containerId = key.split("/").last
+          removeDeletedContainerFromRequestBuffer(containerId)
+          containers -= containerId
         case _ =>
       }
       stay
@@ -499,6 +506,7 @@ class MemoryQueue(private val etcdClient: EtcdClient,
         handleActivationRequest(request)
       } else {
         logging.info(this, s"Remove containerId because ${request.containerId} 
is not alive")
+        removeDeletedContainerFromRequestBuffer(request.containerId)
         sender ! GetActivationResponse(Left(NoActivationMessage()))
         containers -= request.containerId
         stay
@@ -766,7 +774,9 @@ class MemoryQueue(private val etcdClient: EtcdClient,
       activation.transid)
 
     val totalTimeInScheduler = Interval(activation.transid.meta.start, 
Instant.now()).duration
-    
MetricEmitter.emitHistogramMetric(LoggingMarkers.SCHEDULER_WAIT_TIME(action.asString),
 totalTimeInScheduler.toMillis)
+    MetricEmitter.emitHistogramMetric(
+      LoggingMarkers.SCHEDULER_WAIT_TIME(action.asString),
+      totalTimeInScheduler.toMillis)
 
     val activationResponse =
       if (isWhiskError)
@@ -931,6 +941,16 @@ class MemoryQueue(private val etcdClient: EtcdClient,
     } else None
   }
 
+  private def removeDeletedContainerFromRequestBuffer(containerId: String): 
Unit = {
+    requestBuffer = requestBuffer.filter { buffer =>
+      if (buffer.containerId.drop(1) == containerId) {
+        buffer.promise.trySuccess(Left(NoActivationMessage()))
+        false
+      } else
+        true
+    }
+  }
+
   private def handleActivationMessage(msg: ActivationMessage) = {
     logging.info(this, s"[$invocationNamespace:$action:$stateName] got a new 
activation message ${msg.activationId}")(
       msg.transid)
@@ -938,7 +958,9 @@ class MemoryQueue(private val etcdClient: EtcdClient,
     takeUncompletedRequest()
       .map { res =>
         val totalTimeInScheduler = Interval(msg.transid.meta.start, 
Instant.now()).duration
-        
MetricEmitter.emitHistogramMetric(LoggingMarkers.SCHEDULER_WAIT_TIME(action.asString),
 totalTimeInScheduler.toMillis)
+        MetricEmitter.emitHistogramMetric(
+          LoggingMarkers.SCHEDULER_WAIT_TIME(action.asString),
+          totalTimeInScheduler.toMillis)
         res.trySuccess(Right(msg))
         in.decrementAndGet()
         stay
@@ -960,7 +982,9 @@ class MemoryQueue(private val etcdClient: EtcdClient,
         this,
         s"[$invocationNamespace:$action:$stateName] Get activation request 
${request.containerId}, send one message: ${msg.activationId}")
       val totalTimeInScheduler = Interval(msg.transid.meta.start, 
Instant.now()).duration
-      
MetricEmitter.emitHistogramMetric(LoggingMarkers.SCHEDULER_WAIT_TIME(action.asString),
 totalTimeInScheduler.toMillis)
+      MetricEmitter.emitHistogramMetric(
+        LoggingMarkers.SCHEDULER_WAIT_TIME(action.asString),
+        totalTimeInScheduler.toMillis)
 
       sender ! GetActivationResponse(Right(msg))
       tryDisableActionThrottling()
diff --git 
a/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/MemoryQueueTests.scala
 
b/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/MemoryQueueTests.scala
index fc1eac80a..15ac5f4c3 100644
--- 
a/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/MemoryQueueTests.scala
+++ 
b/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/MemoryQueueTests.scala
@@ -697,6 +697,60 @@ class MemoryQueueTests
     fsm.stop()
   }
 
+  it should "not send msg to a deleted container" in {
+    val mockEtcdClient = mock[EtcdClient]
+    val probe = TestProbe()
+    val tid = TransactionId(TransactionId.generateTid())
+
+    expectDurationChecking(mockEsClient, testInvocationNamespace)
+
+    val fsm =
+      TestFSMRef(
+        new MemoryQueue(
+          mockEtcdClient,
+          durationChecker,
+          fqn,
+          mockMessaging(),
+          schedulingConfig,
+          testInvocationNamespace,
+          revision,
+          endpoints,
+          actionMetadata,
+          probe.ref,
+          probe.ref,
+          probe.ref,
+          TestProbe().ref,
+          schedulerId,
+          ack,
+          store,
+          getUserLimit,
+          checkToDropStaleActivation,
+          queueConfig))
+
+    fsm.setState(Running, RunningData(probe.ref, probe.ref))
+
+    val sender1 = TestProbe()
+    val sender2 = TestProbe()
+    fsm.tell(GetActivation(tid, fqn, "1", false, None), sender1.ref)
+    fsm.tell(GetActivation(tid, fqn, "2", false, None), sender2.ref)
+    fsm.tell(GetActivation(tid, fqn, "2", false, None, false), sender2.ref)
+    fsm ! message
+
+    // sender 1 will get a message while sender 2 will get a 
NoActivationMessage
+    sender1.expectMsg(GetActivationResponse(Right(message)))
+    sender2.expectMsg(GetActivationResponse(Left(NoActivationMessage())))
+    sender2.expectMsg(GetActivationResponse(Left(NoActivationMessage())))
+
+    fsm.tell(GetActivation(tid, fqn, "1", false, None), sender1.ref)
+    fsm.tell(GetActivation(tid, fqn, "2", false, None), sender2.ref)
+    fsm ! WatchEndpointRemoved(existingContainerKey, "2", "", true) // remove 
container2 using watch event
+    fsm ! message
+
+    // sender 1 will get a message while sender 2 will get a 
NoActivationMessage
+    sender1.expectMsg(GetActivationResponse(Right(message)))
+    sender2.expectMsg(GetActivationResponse(Left(NoActivationMessage())))
+  }
+
   it should "send response to request according to the order of container id 
and warmed flag" in {
     val mockEtcdClient = mock[EtcdClient]
     val probe = TestProbe()

Reply via email to