This is an automated email from the ASF dual-hosted git repository.
ningyougang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/openwhisk.git
The following commit(s) were added to refs/heads/master by this push:
new 88435790c [Scheduler Enhancement] Remove deleted containers. (#5265)
88435790c is described below
commit 88435790cbed79db3f887c1918599cb9dead2590
Author: Dominic Kim <[email protected]>
AuthorDate: Tue Jul 5 11:27:52 2022 +0900
[Scheduler Enhancement] Remove deleted containers. (#5265)
* Apply ScalaFmt.
* Do not send messages to deleted containers.
* Use milliseconds as a measurement unit.
* Fix test code.
---
.../org/apache/openwhisk/common/Logging.scala | 5 +-
.../core/scheduler/queue/MemoryQueue.scala | 36 ++++++++++++---
.../scheduler/queue/test/MemoryQueueTests.scala | 54 ++++++++++++++++++++++
3 files changed, 87 insertions(+), 8 deletions(-)
diff --git
a/common/scala/src/main/scala/org/apache/openwhisk/common/Logging.scala
b/common/scala/src/main/scala/org/apache/openwhisk/common/Logging.scala
index c0dbb783a..a9da001b0 100644
--- a/common/scala/src/main/scala/org/apache/openwhisk/common/Logging.scala
+++ b/common/scala/src/main/scala/org/apache/openwhisk/common/Logging.scala
@@ -592,8 +592,9 @@ object LoggingMarkers {
// Time that is needed to produce message in kafka
val SCHEDULER_KAFKA = LogMarkerToken(scheduler, kafka,
start)(MeasurementUnit.time.milliseconds)
val SCHEDULER_KAFKA_WAIT_TIME =
- LogMarkerToken(scheduler, "kafkaWaitTime", counter)(MeasurementUnit.none)
- def SCHEDULER_WAIT_TIME(action: String) = LogMarkerToken(scheduler,
"waitTime", counter, Some(action), Map("action" ->
action))(MeasurementUnit.none)
+ LogMarkerToken(scheduler, "kafkaWaitTime",
counter)(MeasurementUnit.time.milliseconds)
+ def SCHEDULER_WAIT_TIME(action: String) =
+ LogMarkerToken(scheduler, "waitTime", counter, Some(action), Map("action"
-> action))(MeasurementUnit.time.milliseconds)
def SCHEDULER_KEEP_ALIVE(leaseId: Long) =
LogMarkerToken(scheduler, "keepAlive", counter, None, Map("leaseId" ->
leaseId.toString))(MeasurementUnit.none)
diff --git
a/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/queue/MemoryQueue.scala
b/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/queue/MemoryQueue.scala
index 51a674457..e4ba0022b 100644
---
a/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/queue/MemoryQueue.scala
+++
b/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/queue/MemoryQueue.scala
@@ -35,7 +35,12 @@ import org.apache.openwhisk.core.etcd.EtcdClient
import org.apache.openwhisk.core.etcd.EtcdKV.ContainerKeys.containerPrefix
import org.apache.openwhisk.core.etcd.EtcdKV.{ContainerKeys, QueueKeys,
ThrottlingKeys}
import org.apache.openwhisk.core.scheduler.grpc.{GetActivation,
ActivationResponse => GetActivationResponse}
-import org.apache.openwhisk.core.scheduler.message.{ContainerCreation,
ContainerDeletion, FailedCreationJob, SuccessfulCreationJob}
+import org.apache.openwhisk.core.scheduler.message.{
+ ContainerCreation,
+ ContainerDeletion,
+ FailedCreationJob,
+ SuccessfulCreationJob
+}
import org.apache.openwhisk.core.scheduler.{SchedulerEndpoints,
SchedulingConfig}
import org.apache.openwhisk.core.service._
import org.apache.openwhisk.http.Messages.{namespaceLimitUnderZero,
tooManyConcurrentRequests}
@@ -47,7 +52,7 @@ import scala.annotation.tailrec
import scala.collection.immutable.Queue
import scala.collection.mutable
import scala.concurrent.duration._
-import scala.concurrent.{ExecutionContextExecutor, Future, Promise, duration}
+import scala.concurrent.{duration, ExecutionContextExecutor, Future, Promise}
import scala.util.{Failure, Success}
// States
@@ -453,7 +458,9 @@ class MemoryQueue(private val etcdClient: EtcdClient,
case `inProgressContainerPrefixKey` =>
creationIds -= key.split("/").last
case `existingContainerPrefixKey` =>
- containers -= key.split("/").last
+ val containerId = key.split("/").last
+ removeDeletedContainerFromRequestBuffer(containerId)
+ containers -= containerId
case _ =>
}
stay
@@ -499,6 +506,7 @@ class MemoryQueue(private val etcdClient: EtcdClient,
handleActivationRequest(request)
} else {
logging.info(this, s"Remove containerId because ${request.containerId}
is not alive")
+ removeDeletedContainerFromRequestBuffer(request.containerId)
sender ! GetActivationResponse(Left(NoActivationMessage()))
containers -= request.containerId
stay
@@ -766,7 +774,9 @@ class MemoryQueue(private val etcdClient: EtcdClient,
activation.transid)
val totalTimeInScheduler = Interval(activation.transid.meta.start,
Instant.now()).duration
-
MetricEmitter.emitHistogramMetric(LoggingMarkers.SCHEDULER_WAIT_TIME(action.asString),
totalTimeInScheduler.toMillis)
+ MetricEmitter.emitHistogramMetric(
+ LoggingMarkers.SCHEDULER_WAIT_TIME(action.asString),
+ totalTimeInScheduler.toMillis)
val activationResponse =
if (isWhiskError)
@@ -931,6 +941,16 @@ class MemoryQueue(private val etcdClient: EtcdClient,
} else None
}
+ private def removeDeletedContainerFromRequestBuffer(containerId: String):
Unit = {
+ requestBuffer = requestBuffer.filter { buffer =>
+ if (buffer.containerId.drop(1) == containerId) {
+ buffer.promise.trySuccess(Left(NoActivationMessage()))
+ false
+ } else
+ true
+ }
+ }
+
private def handleActivationMessage(msg: ActivationMessage) = {
logging.info(this, s"[$invocationNamespace:$action:$stateName] got a new
activation message ${msg.activationId}")(
msg.transid)
@@ -938,7 +958,9 @@ class MemoryQueue(private val etcdClient: EtcdClient,
takeUncompletedRequest()
.map { res =>
val totalTimeInScheduler = Interval(msg.transid.meta.start,
Instant.now()).duration
-
MetricEmitter.emitHistogramMetric(LoggingMarkers.SCHEDULER_WAIT_TIME(action.asString),
totalTimeInScheduler.toMillis)
+ MetricEmitter.emitHistogramMetric(
+ LoggingMarkers.SCHEDULER_WAIT_TIME(action.asString),
+ totalTimeInScheduler.toMillis)
res.trySuccess(Right(msg))
in.decrementAndGet()
stay
@@ -960,7 +982,9 @@ class MemoryQueue(private val etcdClient: EtcdClient,
this,
s"[$invocationNamespace:$action:$stateName] Get activation request
${request.containerId}, send one message: ${msg.activationId}")
val totalTimeInScheduler = Interval(msg.transid.meta.start,
Instant.now()).duration
-
MetricEmitter.emitHistogramMetric(LoggingMarkers.SCHEDULER_WAIT_TIME(action.asString),
totalTimeInScheduler.toMillis)
+ MetricEmitter.emitHistogramMetric(
+ LoggingMarkers.SCHEDULER_WAIT_TIME(action.asString),
+ totalTimeInScheduler.toMillis)
sender ! GetActivationResponse(Right(msg))
tryDisableActionThrottling()
diff --git
a/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/MemoryQueueTests.scala
b/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/MemoryQueueTests.scala
index fc1eac80a..15ac5f4c3 100644
---
a/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/MemoryQueueTests.scala
+++
b/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/MemoryQueueTests.scala
@@ -697,6 +697,60 @@ class MemoryQueueTests
fsm.stop()
}
+ it should "not send msg to a deleted container" in {
+ val mockEtcdClient = mock[EtcdClient]
+ val probe = TestProbe()
+ val tid = TransactionId(TransactionId.generateTid())
+
+ expectDurationChecking(mockEsClient, testInvocationNamespace)
+
+ val fsm =
+ TestFSMRef(
+ new MemoryQueue(
+ mockEtcdClient,
+ durationChecker,
+ fqn,
+ mockMessaging(),
+ schedulingConfig,
+ testInvocationNamespace,
+ revision,
+ endpoints,
+ actionMetadata,
+ probe.ref,
+ probe.ref,
+ probe.ref,
+ TestProbe().ref,
+ schedulerId,
+ ack,
+ store,
+ getUserLimit,
+ checkToDropStaleActivation,
+ queueConfig))
+
+ fsm.setState(Running, RunningData(probe.ref, probe.ref))
+
+ val sender1 = TestProbe()
+ val sender2 = TestProbe()
+ fsm.tell(GetActivation(tid, fqn, "1", false, None), sender1.ref)
+ fsm.tell(GetActivation(tid, fqn, "2", false, None), sender2.ref)
+ fsm.tell(GetActivation(tid, fqn, "2", false, None, false), sender2.ref)
+ fsm ! message
+
+ // sender 1 will get a message while sender 2 will get a
NoActivationMessage
+ sender1.expectMsg(GetActivationResponse(Right(message)))
+ sender2.expectMsg(GetActivationResponse(Left(NoActivationMessage())))
+ sender2.expectMsg(GetActivationResponse(Left(NoActivationMessage())))
+
+ fsm.tell(GetActivation(tid, fqn, "1", false, None), sender1.ref)
+ fsm.tell(GetActivation(tid, fqn, "2", false, None), sender2.ref)
+ fsm ! WatchEndpointRemoved(existingContainerKey, "2", "", true) // remove
container2 using watch event
+ fsm ! message
+
+ // sender 1 will get a message while sender 2 will get a
NoActivationMessage
+ sender1.expectMsg(GetActivationResponse(Right(message)))
+ sender2.expectMsg(GetActivationResponse(Left(NoActivationMessage())))
+ }
+
it should "send response to request according to the order of container id
and warmed flag" in {
val mockEtcdClient = mock[EtcdClient]
val probe = TestProbe()